From 81d15478efca6fc3651c48d09dbd9202888d0a11 Mon Sep 17 00:00:00 2001 From: Yufan Sheng Date: Mon, 26 Sep 2022 22:31:41 +0800 Subject: [PATCH] feat: support Key_Shared subscription in old ways. --- .../pulsar/source/PulsarSourceBuilder.java | 10 ++- .../source/config/PulsarConsumerBuilder.java | 17 ---- .../config/PulsarSourceConfigUtils.java | 2 +- .../subscriber/impl/BasePulsarSubscriber.java | 31 +++++-- .../subscriber/impl/TopicListSubscriber.java | 47 ++++++----- .../impl/TopicPatternSubscriber.java | 4 +- .../enumerator/topic/TopicNameUtils.java | 4 + .../enumerator/topic/TopicPartition.java | 61 +++++++++----- .../topic/range/FixedKeysRangeGenerator.java | 31 +++++-- .../topic/range/FixedRangeGenerator.java | 16 +++- .../topic/range/FullRangeGenerator.java | 18 +++- .../topic/range/RangeGenerator.java | 39 +++++++++ .../topic/range/SplitRangeGenerator.java | 82 +++++++++++++++++++ .../topic/range/TopicRangeUtils.java | 49 ++++++++--- .../split/PulsarPartitionSplitReaderBase.java | 8 +- .../split/PulsarPartitionSplitSerializer.java | 10 ++- .../FlinkContainerWithPulsarEnvironment.java | 3 - 17 files changed, 330 insertions(+), 102 deletions(-) create mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGenerator.java diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index 018fe3b0aed3e..b76e2dee3cfcc 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -34,6 +34,7 @@ import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.SplitRangeGenerator; import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; import org.apache.pulsar.client.api.CryptoKeyReader; @@ -471,12 +472,15 @@ public PulsarSource build() { checkNotNull(subscriber, "No topic names or topic pattern are provided."); SubscriptionType subscriptionType = configBuilder.get(PULSAR_SUBSCRIPTION_TYPE); - if (rangeGenerator == null) { - if (subscriptionType == SubscriptionType.Key_Shared) { + if (subscriptionType == SubscriptionType.Key_Shared) { + if (rangeGenerator == null) { LOG.warn( "No range generator provided for key_shared subscription," - + " we would use the UniformRangeGenerator as the default range generator."); + + " we would use the SplitRangeGenerator as the default range generator."); + this.rangeGenerator = new SplitRangeGenerator(); } + } else { + // Override the range generator. this.rangeGenerator = new FullRangeGenerator(); } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarConsumerBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarConsumerBuilder.java index 8e417f7d4e1d3..09e4322af454c 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarConsumerBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarConsumerBuilder.java @@ -21,20 +21,16 @@ import org.apache.flink.shaded.guava30.com.google.common.base.Strings; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerInterceptor; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.apache.pulsar.client.impl.ConsumerInterceptors; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.naming.TopicName; -import javax.annotation.Nonnull; - import java.util.List; import java.util.concurrent.CompletableFuture; @@ -49,19 +45,6 @@ public PulsarConsumerBuilder(PulsarClient client, Schema schema) { super((PulsarClientImpl) client, schema); } - @Override - public ConsumerBuilder subscriptionType(@Nonnull SubscriptionType subscriptionType) { - if (subscriptionType == SubscriptionType.Key_Shared) { - // Override the key shared subscription into exclusive for making it behaviors like a - // Pulsar Reader which supports partial key hash ranges. - super.subscriptionType(SubscriptionType.Exclusive); - } else { - super.subscriptionType(subscriptionType); - } - - return this; - } - @Override public CompletableFuture> subscribeAsync() { PulsarClientImpl client = super.getClient(); diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java index 895941d11b532..063d588deccba 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java @@ -77,7 +77,7 @@ public final class PulsarSourceConfigUtils { .build(); private PulsarSourceConfigUtils() { - // No need to create instance. + // No need to create an instance. } public static final PulsarConfigValidator SOURCE_CONFIG_VALIDATOR = diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java index aaab3cb3f5bd3..e2ca77e4edaf9 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java @@ -23,9 +23,9 @@ import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import java.util.ArrayList; import java.util.List; @@ -40,7 +40,7 @@ public abstract class BasePulsarSubscriber implements PulsarSubscriber { protected TopicMetadata queryTopicMetadata(PulsarAdminRequest adminRequest, String topicName) { try { return adminRequest.getTopicMetadata(topicName); - } catch (NotFoundException e) { + } catch (PulsarAdminException.NotFoundException e) { return null; } catch (PulsarAdminException e) { sneakyThrow(e); @@ -49,18 +49,35 @@ protected TopicMetadata queryTopicMetadata(PulsarAdminRequest adminRequest, Stri } protected List toTopicPartitions( - TopicMetadata metadata, List ranges) { + TopicMetadata metadata, List ranges, KeySharedMode mode) { if (!metadata.isPartitioned()) { // For non-partitioned topic. - TopicPartition partition = new TopicPartition(metadata.getName(), -1, ranges); - return singletonList(partition); + return toTopicPartitions(metadata.getName(), -1, ranges, mode); } else { + // For partitioned topic. List partitions = new ArrayList<>(); for (int i = 0; i < metadata.getPartitionSize(); i++) { - partitions.add(new TopicPartition(metadata.getName(), i, ranges)); + partitions.addAll(toTopicPartitions(metadata.getName(), i, ranges, mode)); } - return partitions; } } + + protected List toTopicPartitions( + String topic, int partitionId, List ranges, KeySharedMode mode) { + switch (mode) { + case JOIN: + return singletonList(new TopicPartition(topic, partitionId, ranges, mode)); + case SPLIT: + List partitions = new ArrayList<>(ranges.size()); + for (TopicRange range : ranges) { + TopicPartition partition = + new TopicPartition(topic, partitionId, singletonList(range), mode); + partitions.add(partition); + } + return partitions; + default: + throw new UnsupportedOperationException(mode + " isn't supported."); + } + } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java index 22015f4940f57..cbe1216402e87 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java @@ -26,14 +26,13 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.pulsar.common.naming.TopicName; + +import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; -import java.util.stream.Stream; -import static java.util.stream.Collectors.toSet; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartitioned; -import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED; /** the implements of consuming multiple topics. */ public class TopicListSubscriber extends BasePulsarSubscriber { @@ -61,21 +60,29 @@ public TopicListSubscriber(List topics) { @Override public Set getSubscribedTopicPartitions( PulsarAdminRequest metadataRequest, RangeGenerator rangeGenerator, int parallelism) { - Stream partitionStream = - partitions - .parallelStream() - .map(partition -> new TopicMetadata(partition, NON_PARTITIONED)); - Stream topicStream = - topics.parallelStream() - .map(topic -> queryTopicMetadata(metadataRequest, topic)) - .filter(Objects::nonNull); - - return Stream.concat(partitionStream, topicStream) - .flatMap( - metadata -> { - List ranges = rangeGenerator.range(metadata, parallelism); - return toTopicPartitions(metadata, ranges).stream(); - }) - .collect(toSet()); + Set results = new HashSet<>(); + + // Query topics from Pulsar. + for (String topic : topics) { + TopicMetadata metadata = queryTopicMetadata(metadataRequest, topic); + List ranges = rangeGenerator.range(metadata, parallelism); + RangeGenerator.KeySharedMode mode = rangeGenerator.keyShareMode(metadata, parallelism); + + results.addAll(toTopicPartitions(metadata, ranges, mode)); + } + + for (String partition : partitions) { + TopicName topicName = TopicName.get(partition); + String name = topicName.getPartitionedTopicName(); + int index = topicName.getPartitionIndex(); + + TopicMetadata metadata = queryTopicMetadata(metadataRequest, name); + List ranges = rangeGenerator.range(metadata, parallelism); + RangeGenerator.KeySharedMode mode = rangeGenerator.keyShareMode(metadata, parallelism); + + results.addAll(toTopicPartitions(name, index, ranges, mode)); + } + + return results; } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java index fc6e43efed939..868a137d7d768 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java @@ -73,7 +73,9 @@ public Set getSubscribedTopicPartitions( metadata -> { List ranges = rangeGenerator.range(metadata, parallelism); - return toTopicPartitions(metadata, ranges).stream(); + RangeGenerator.KeySharedMode mode = + rangeGenerator.keyShareMode(metadata, parallelism); + return toTopicPartitions(metadata, ranges, mode).stream(); }) .collect(toSet()); } catch (PulsarAdminException e) { diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java index 885edcc67c36f..feac60153503c 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java @@ -75,6 +75,10 @@ public static boolean isPartitioned(String topic) { return TopicName.get(topic).isPartitioned(); } + public static int extractPartitionId(String topic) { + return TopicName.get(topic).getPartitionIndex(); + } + /** Merge the same partitions into one topic. */ public static List distinctTopics(List topics) { Set fullTopics = new HashSet<>(); diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java index ed8357f10da12..9e3731e240345 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java @@ -21,19 +21,24 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.naming.TopicName; import java.io.Serializable; import java.util.List; import java.util.Objects; -import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.extractPartitionId; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartitioned; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.SPLIT; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -50,15 +55,17 @@ public class TopicPartition implements Serializable { */ public static final int NON_PARTITION_ID = -1; + private static final List FULL_RANGES = ImmutableList.of(createFullRange()); + /** - * The topic name of the pulsar. It would be a full topic name, if your don't provide the tenant + * The topic name of the pulsar. It would be a full topic name. If you don't provide the tenant * and namespace, we would add them automatically. */ private final String topic; /** - * Index of partition for the topic. It would be natural number for partitioned topic with a - * non-key_shared subscription. + * Index of partition for the topic. It would be a natural number for the partitioned topic with + * a non-key_shared subscription. */ private final int partitionId; @@ -69,30 +76,33 @@ public class TopicPartition implements Serializable { */ private final List ranges; - /** Create a top-level topic without partition information. */ + /** + * The key share mode for the {@link SubscriptionType#Key_Shared}. It will be {@link + * KeySharedMode#JOIN} for other subscriptions. + */ + private final KeySharedMode mode; + public TopicPartition(String topic) { - TopicName topicName = TopicName.get(topic); - this.topic = topicName.getPartitionedTopicName(); - this.partitionId = - topicName.isPartitioned() ? topicName.getPartitionIndex() : NON_PARTITION_ID; - this.ranges = singletonList(createFullRange()); + this(topic, extractPartitionId(topic)); } - /** Create a topic partition without key hash range. */ public TopicPartition(String topic, int partitionId) { - this(topic, partitionId, singletonList(createFullRange())); + this(topic, partitionId, FULL_RANGES, SPLIT); } - @Internal public TopicPartition(String topic, int partitionId, List ranges) { - TopicName topicName = TopicName.get(topic); + this(topic, partitionId, ranges, SPLIT); + } - this.topic = topicName.getPartitionedTopicName(); + public TopicPartition( + String topic, int partitionId, List ranges, KeySharedMode mode) { + this.topic = topicName(checkNotNull(topic)); this.partitionId = - partitionId == NON_PARTITION_ID && topicName.isPartitioned() - ? topicName.getPartitionIndex() + partitionId == NON_PARTITION_ID && isPartitioned(topic) + ? extractPartitionId(topic) : partitionId; this.ranges = checkNotNull(ranges); + this.mode = mode; } public String getTopic() { @@ -120,18 +130,24 @@ public String getFullTopicName() { } } - /** This method is only internal used for serialization. */ + /** This method is internal used for serialization. */ @Internal public List getRanges() { return ranges; } - /** This method is only internal used for define key shared subscription. */ + /** This method is internal used for define key shared subscription. */ @Internal public List getPulsarRanges() { return ranges.stream().map(TopicRange::toPulsarRange).collect(toList()); } + /** This method is internal used for key shared mode. */ + @Internal + public KeySharedMode getMode() { + return mode; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -144,12 +160,13 @@ public boolean equals(Object o) { return partitionId == partition.partitionId && topic.equals(partition.topic) - && ranges.equals(partition.ranges); + && ranges.equals(partition.ranges) + && mode == partition.mode; } @Override public int hashCode() { - return Objects.hash(topic, partitionId, ranges); + return Objects.hash(topic, partitionId, ranges, mode); } @Override diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedKeysRangeGenerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedKeysRangeGenerator.java index c8adb86454c62..4cf71ab292d59 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedKeysRangeGenerator.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedKeysRangeGenerator.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.pulsar.source.enumerator.topic.range; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; @@ -39,11 +40,11 @@ /** * Pulsar didn't expose the key hash range method. We have to provide an implementation for - * end-user, You can add the keys you want to consume, no need to provide any hash ranges. + * end-user. You can add the keys you want to consume, no need to provide any hash ranges. * - *

Since we use murmur3 hash to define which message you want to consume. The consuming results - * may contain the messages with different keys comparing the keys you have defined in this range - * generator. Remember to use flink's DataStream.filter() method. + *

Since the key's hash isn't specified to only one key. The consuming results may contain the + * messages with different keys comparing the keys you have defined in this range generator. + * Remember to use flink's DataStream.filter() method. * *

Usage:

  * FixedKeysRangeGenerator.builder()
@@ -53,13 +54,16 @@
  *     .build()
  * 
*/ +@PublicEvolving public class FixedKeysRangeGenerator implements RangeGenerator { private static final long serialVersionUID = 2372969466289052100L; private final List ranges; + private final KeySharedMode sharedMode; - private FixedKeysRangeGenerator(List ranges) { + private FixedKeysRangeGenerator(List ranges, KeySharedMode sharedMode) { this.ranges = ranges; + this.sharedMode = sharedMode; } @Override @@ -67,6 +71,12 @@ public List range(TopicMetadata metadata, int parallelism) { return ranges; } + @Override + public KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism) { + return sharedMode; + } + + @PublicEvolving public static FixedKeysRangeGeneratorBuilder builder() { return new FixedKeysRangeGeneratorBuilder(); } @@ -75,6 +85,7 @@ public static FixedKeysRangeGeneratorBuilder builder() { public static class FixedKeysRangeGeneratorBuilder { private final SortedSet keyHashes = new TreeSet<>(); + private KeySharedMode sharedMode = KeySharedMode.JOIN; private FixedKeysRangeGeneratorBuilder() { // No public for builder @@ -127,6 +138,12 @@ public FixedKeysRangeGeneratorBuilder orderingKey(byte[] keyBytes) { return this; } + /** Override the default {@link KeySharedMode#JOIN} to the mode your have provided. */ + public FixedKeysRangeGeneratorBuilder keySharedMode(KeySharedMode sharedMode) { + this.sharedMode = sharedMode; + return this; + } + /** Create the FixedKeysRangeGenerator by the given keys. */ public FixedKeysRangeGenerator build() { List ranges = new ArrayList<>(); @@ -161,8 +178,8 @@ public FixedKeysRangeGenerator build() { ranges.add(range); } - validateTopicRanges(ranges); - return new FixedKeysRangeGenerator(ranges); + validateTopicRanges(ranges, sharedMode); + return new FixedKeysRangeGenerator(ranges, sharedMode); } } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java index 09322d0da1ad4..84c41e96e0132 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.pulsar.source.enumerator.topic.range; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; @@ -26,18 +27,31 @@ import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.validateTopicRanges; /** Always return the same range set for all topics. */ +@PublicEvolving public class FixedRangeGenerator implements RangeGenerator { private static final long serialVersionUID = -3895203007855538734L; private final List ranges; + private final KeySharedMode sharedMode; public FixedRangeGenerator(List ranges) { - validateTopicRanges(ranges); + this(ranges, KeySharedMode.JOIN); + } + + public FixedRangeGenerator(List ranges, KeySharedMode sharedMode) { + validateTopicRanges(ranges, sharedMode); + this.ranges = ranges; + this.sharedMode = sharedMode; } @Override public List range(TopicMetadata metadata, int parallelism) { return ranges; } + + @Override + public KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism) { + return sharedMode; + } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FullRangeGenerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FullRangeGenerator.java index e8fb3c6338aae..1ac69e00af927 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FullRangeGenerator.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FullRangeGenerator.java @@ -18,21 +18,31 @@ package org.apache.flink.connector.pulsar.source.enumerator.topic.range; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; -import java.util.Collections; +import org.apache.pulsar.client.api.SubscriptionType; + import java.util.List; +import static java.util.Collections.singletonList; + /** - * Default implementation for all the Pulsar subscription. We will consume all the messages from the - * Pulsar. + * Default implementation for {@link SubscriptionType#Shared}, {@link SubscriptionType#Failover} and + * {@link SubscriptionType#Exclusive} subscription. */ +@PublicEvolving public class FullRangeGenerator implements RangeGenerator { private static final long serialVersionUID = -4571731955155036216L; @Override public List range(TopicMetadata metadata, int parallelism) { - return Collections.singletonList(TopicRange.createFullRange()); + return singletonList(TopicRange.createFullRange()); + } + + @Override + public KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism) { + return KeySharedMode.SPLIT; } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/RangeGenerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/RangeGenerator.java index 6f9621d40360b..b4ef43fd7fe05 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/RangeGenerator.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/RangeGenerator.java @@ -50,8 +50,47 @@ public interface RangeGenerator extends Serializable { */ List range(TopicMetadata metadata, int parallelism); + /** + * Defines the default behavior for Key_Shared subscription in Flink. See {@link KeySharedMode} + * for the detailed usage of the key share mode. + * + * @param metadata The metadata of the topic. + * @param parallelism The reader size for this topic. + */ + default KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism) { + return KeySharedMode.SPLIT; + } + /** Initialize some extra resources when bootstrap the source. */ default void open(SourceConfiguration sourceConfiguration) { // This method is used for user implementation. } + + /** + * Different Key_Shared mode means different split assignment behaviors. If you only consume a + * subset of Pulsar's key hash range, remember to use the {@link KeySharedMode#JOIN} mode which + * will subscribe all the range in only one reader. Otherwise, when the ranges can join into a + * full Pulsar key hash range (0 ~ 65535) you should use {@link KeySharedMode#SPLIT} for sharing + * the splits among all the backend readers. + * + *

In the {@link KeySharedMode#SPLIT} mode. The topic will be subscribed by multiple readers. + * But Pulsar has one limit in this situation. That is if a Message can't find the corresponding + * reader by the key hash range. No messages will be delivered to the current readers, until + * there is a reader which can subscribe to such messages. + */ + @PublicEvolving + enum KeySharedMode { + + /** + * The topic ranges that the {@link RangeGenerator} generated will be split among the + * readers. + */ + SPLIT, + + /** + * Assign all the topic ranges to only one reader instance. This is used for partial key + * hash range subscription. + */ + JOIN + } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGenerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGenerator.java new file mode 100644 index 0000000000000..20f76c6ed96b2 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGenerator.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic.range; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; + +import org.apache.pulsar.client.api.SubscriptionType; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MIN_RANGE; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * This range generator would divide the range by the flink source parallelism. It would be the + * default implementation for {@link SubscriptionType#Key_Shared} subscription. + */ +@PublicEvolving +public class SplitRangeGenerator implements RangeGenerator { + private static final long serialVersionUID = -8682286436352905249L; + + private final int start; + private final int end; + + public SplitRangeGenerator() { + this(MIN_RANGE, MAX_RANGE); + } + + public SplitRangeGenerator(int start, int end) { + checkArgument( + start >= MIN_RANGE, + "Start range should be equal to or great than the min range " + MIN_RANGE); + checkArgument( + end <= MAX_RANGE, "End range should below or less than the max range " + MAX_RANGE); + checkArgument(start <= end, "Start range should be equal to or less than the end range"); + + this.start = start; + this.end = end; + } + + @Override + public List range(TopicMetadata metadata, int parallelism) { + final int range = end - start + 1; + final int size = Math.min(range, parallelism); + int startRange = 0; + + List results = new ArrayList<>(size); + for (int i = 1; i < size; i++) { + int nextStartRange = i * range / size; + results.add(new TopicRange(start + startRange, start + nextStartRange - 1)); + startRange = nextStartRange; + } + results.add(new TopicRange(start + startRange, end)); + + return results; + } + + @Override + public KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism) { + return KeySharedMode.SPLIT; + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtils.java index 1ab1d71cc19fc..8728bdeee3f46 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtils.java @@ -21,20 +21,23 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode; import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.common.util.Murmur3_32Hash; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Base64; import java.util.List; +import static java.util.Comparator.comparingLong; import static java.util.stream.Collectors.toList; -import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -43,6 +46,7 @@ */ @PublicEvolving public final class TopicRangeUtils { + private static final Logger LOG = LoggerFactory.getLogger(TopicRangeUtils.class); /** Pulsar would use this as default key if no key was provided. */ public static final String NONE_KEY = "NONE_KEY"; @@ -52,18 +56,39 @@ private TopicRangeUtils() { } /** Make sure all the ranges should be valid in Pulsar Key Shared Policy. */ - public static void validateTopicRanges(TopicRange... ranges) { - checkNotNull(ranges); - checkArgument(ranges.length > 0); - validateTopicRanges(Arrays.asList(ranges)); + public static void validateTopicRanges(List ranges, KeySharedMode sharedMode) { + List pulsarRanges = ranges.stream().map(TopicRange::toPulsarRange).collect(toList()); + KeySharedPolicy.stickyHashRange().ranges(pulsarRanges).validate(); + + if (!isFullTopicRanges(ranges) && KeySharedMode.SPLIT == sharedMode) { + LOG.warn( + "You have provided a partial key hash range with KeySharedMode.SPLIT. " + + "You can't consume any message if there are any messages with keys that are out of the given ranges."); + } } - /** Make sure all the ranges should be valid in Pulsar Key Shared Policy. */ - public static void validateTopicRanges(List ranges) { - List pulsarRanges = ranges.stream().map(TopicRange::toPulsarRange).collect(toList()); - KeySharedPolicy.KeySharedPolicySticky sharedPolicy = - KeySharedPolicy.stickyHashRange().ranges(pulsarRanges); - sharedPolicy.validate(); + /** Check if the given topic ranges are full Pulsar range. */ + public static boolean isFullTopicRanges(List ranges) { + List sorted = + ranges.stream().sorted(comparingLong(TopicRange::getStart)).collect(toList()); + int start = 0; + for (TopicRange range : sorted) { + if (start == 0) { + if (range.getStart() == 0) { + start = range.getEnd(); + continue; + } else { + return false; + } + } + + if (range.getStart() - start != 1) { + return false; + } + start = range.getEnd(); + } + + return start == MAX_RANGE; } /** diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java index a7736a3dc7f70..a41364dfc22e6 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java @@ -53,6 +53,7 @@ import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.JOIN; import static org.apache.pulsar.client.api.KeySharedPolicy.stickyHashRange; /** The common partition split reader. */ @@ -220,8 +221,13 @@ protected Consumer createPulsarConsumer(TopicPartition partition) { // We may enable out of order delivery for speeding up. It was turned off by default. policy.setAllowOutOfOrderDelivery( sourceConfiguration.isAllowKeySharedOutOfOrderDelivery()); - consumerBuilder.keySharedPolicy(policy); + + if (partition.getMode() == JOIN) { + // Override the key shared subscription into exclusive for making it behaviors like + // a Pulsar Reader which supports partial key hash ranges. + consumerBuilder.subscriptionType(SubscriptionType.Exclusive); + } } // Create the consumer configuration by using common utils. diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializer.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializer.java index eab2efb7d1169..fb88d3d910de2 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializer.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializer.java @@ -21,6 +21,7 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.pulsar.client.api.MessageId; @@ -150,20 +151,22 @@ public void serializeTopicPartition(DataOutputStream out, TopicPartition partiti o.writeInt(r.getStart()); o.writeInt(r.getEnd()); }); + out.writeInt(partition.getMode().ordinal()); } public TopicPartition deserializeTopicPartition(int version, DataInputStream in) throws IOException { String topic = in.readUTF(); int partitionId = in.readInt(); - List ranges = null; - + List ranges; + KeySharedMode keySharedMode; if (version == 0) { // VERSION 0 deserialization int start = in.readInt(); int end = in.readInt(); TopicRange range = new TopicRange(start, end); ranges = singletonList(range); + keySharedMode = KeySharedMode.SPLIT; } else { // VERSION 1 deserialization ranges = @@ -174,8 +177,9 @@ public TopicPartition deserializeTopicPartition(int version, DataInputStream in) int end = i.readInt(); return new TopicRange(start, end); }); + keySharedMode = KeySharedMode.values()[in.readInt()]; } - return new TopicPartition(topic, partitionId, ranges); + return new TopicPartition(topic, partitionId, ranges, keySharedMode); } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java index 14d8cbdd12cec..cbecb783966df 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java @@ -53,9 +53,6 @@ private static String resourcePath(String jarName) { protected static Configuration flinkConfiguration() { Configuration configuration = new Configuration(); - // Increase the off heap memory of TaskManager to avoid direct buffer memory error in Pulsar - // e2e tests. - configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(100)); // Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048));