Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.SplitRangeGenerator;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;

import org.apache.pulsar.client.api.CryptoKeyReader;
Expand Down Expand Up @@ -471,12 +472,15 @@ public PulsarSource<OUT> build() {
checkNotNull(subscriber, "No topic names or topic pattern are provided.");

SubscriptionType subscriptionType = configBuilder.get(PULSAR_SUBSCRIPTION_TYPE);
if (rangeGenerator == null) {
if (subscriptionType == SubscriptionType.Key_Shared) {
if (subscriptionType == SubscriptionType.Key_Shared) {
if (rangeGenerator == null) {
LOG.warn(
"No range generator provided for key_shared subscription,"
+ " we would use the UniformRangeGenerator as the default range generator.");
+ " we would use the SplitRangeGenerator as the default range generator.");
this.rangeGenerator = new SplitRangeGenerator();
}
} else {
// Override the range generator.
this.rangeGenerator = new FullRangeGenerator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@
import org.apache.flink.shaded.guava30.com.google.common.base.Strings;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerInterceptor;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.impl.ConsumerInterceptors;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;

import javax.annotation.Nonnull;

import java.util.List;
import java.util.concurrent.CompletableFuture;

Expand All @@ -49,19 +45,6 @@ public PulsarConsumerBuilder(PulsarClient client, Schema<T> schema) {
super((PulsarClientImpl) client, schema);
}

@Override
public ConsumerBuilder<T> subscriptionType(@Nonnull SubscriptionType subscriptionType) {
if (subscriptionType == SubscriptionType.Key_Shared) {
// Override the key shared subscription into exclusive for making it behaviors like a
// Pulsar Reader which supports partial key hash ranges.
super.subscriptionType(SubscriptionType.Exclusive);
} else {
super.subscriptionType(subscriptionType);
}

return this;
}

@Override
public CompletableFuture<Consumer<T>> subscribeAsync() {
PulsarClientImpl client = super.getClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public final class PulsarSourceConfigUtils {
.build();

private PulsarSourceConfigUtils() {
// No need to create instance.
// No need to create an instance.
}

public static final PulsarConfigValidator SOURCE_CONFIG_VALIDATOR =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -40,7 +40,7 @@ public abstract class BasePulsarSubscriber implements PulsarSubscriber {
protected TopicMetadata queryTopicMetadata(PulsarAdminRequest adminRequest, String topicName) {
try {
return adminRequest.getTopicMetadata(topicName);
} catch (NotFoundException e) {
} catch (PulsarAdminException.NotFoundException e) {
return null;
} catch (PulsarAdminException e) {
sneakyThrow(e);
Expand All @@ -49,18 +49,35 @@ protected TopicMetadata queryTopicMetadata(PulsarAdminRequest adminRequest, Stri
}

protected List<TopicPartition> toTopicPartitions(
TopicMetadata metadata, List<TopicRange> ranges) {
TopicMetadata metadata, List<TopicRange> ranges, KeySharedMode mode) {
if (!metadata.isPartitioned()) {
// For non-partitioned topic.
TopicPartition partition = new TopicPartition(metadata.getName(), -1, ranges);
return singletonList(partition);
return toTopicPartitions(metadata.getName(), -1, ranges, mode);
} else {
// For partitioned topic.
List<TopicPartition> partitions = new ArrayList<>();
for (int i = 0; i < metadata.getPartitionSize(); i++) {
partitions.add(new TopicPartition(metadata.getName(), i, ranges));
partitions.addAll(toTopicPartitions(metadata.getName(), i, ranges, mode));
}

return partitions;
}
}

protected List<TopicPartition> toTopicPartitions(
String topic, int partitionId, List<TopicRange> ranges, KeySharedMode mode) {
switch (mode) {
case JOIN:
return singletonList(new TopicPartition(topic, partitionId, ranges, mode));
case SPLIT:
List<TopicPartition> partitions = new ArrayList<>(ranges.size());
for (TopicRange range : ranges) {
TopicPartition partition =
new TopicPartition(topic, partitionId, singletonList(range), mode);
partitions.add(partition);
}
return partitions;
default:
throw new UnsupportedOperationException(mode + " isn't supported.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;

import org.apache.pulsar.common.naming.TopicName;

import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toSet;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartitioned;
import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;

/** the implements of consuming multiple topics. */
public class TopicListSubscriber extends BasePulsarSubscriber {
Expand Down Expand Up @@ -61,21 +60,29 @@ public TopicListSubscriber(List<String> topics) {
@Override
public Set<TopicPartition> getSubscribedTopicPartitions(
PulsarAdminRequest metadataRequest, RangeGenerator rangeGenerator, int parallelism) {
Stream<TopicMetadata> partitionStream =
partitions
.parallelStream()
.map(partition -> new TopicMetadata(partition, NON_PARTITIONED));
Stream<TopicMetadata> topicStream =
topics.parallelStream()
.map(topic -> queryTopicMetadata(metadataRequest, topic))
.filter(Objects::nonNull);

return Stream.concat(partitionStream, topicStream)
.flatMap(
metadata -> {
List<TopicRange> ranges = rangeGenerator.range(metadata, parallelism);
return toTopicPartitions(metadata, ranges).stream();
})
.collect(toSet());
Set<TopicPartition> results = new HashSet<>();

// Query topics from Pulsar.
for (String topic : topics) {
TopicMetadata metadata = queryTopicMetadata(metadataRequest, topic);
List<TopicRange> ranges = rangeGenerator.range(metadata, parallelism);
RangeGenerator.KeySharedMode mode = rangeGenerator.keyShareMode(metadata, parallelism);

results.addAll(toTopicPartitions(metadata, ranges, mode));
}

for (String partition : partitions) {
TopicName topicName = TopicName.get(partition);
String name = topicName.getPartitionedTopicName();
int index = topicName.getPartitionIndex();

TopicMetadata metadata = queryTopicMetadata(metadataRequest, name);
List<TopicRange> ranges = rangeGenerator.range(metadata, parallelism);
RangeGenerator.KeySharedMode mode = rangeGenerator.keyShareMode(metadata, parallelism);

results.addAll(toTopicPartitions(name, index, ranges, mode));
}

return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public Set<TopicPartition> getSubscribedTopicPartitions(
metadata -> {
List<TopicRange> ranges =
rangeGenerator.range(metadata, parallelism);
return toTopicPartitions(metadata, ranges).stream();
RangeGenerator.KeySharedMode mode =
rangeGenerator.keyShareMode(metadata, parallelism);
return toTopicPartitions(metadata, ranges, mode).stream();
})
.collect(toSet());
} catch (PulsarAdminException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public static boolean isPartitioned(String topic) {
return TopicName.get(topic).isPartitioned();
}

public static int extractPartitionId(String topic) {
return TopicName.get(topic).getPartitionIndex();
}

/** Merge the same partitions into one topic. */
public static List<String> distinctTopics(List<String> topics) {
Set<String> fullTopics = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,24 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;

import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;

import java.io.Serializable;
import java.util.List;
import java.util.Objects;

import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.extractPartitionId;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartitioned;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.SPLIT;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -50,15 +55,17 @@ public class TopicPartition implements Serializable {
*/
public static final int NON_PARTITION_ID = -1;

private static final List<TopicRange> FULL_RANGES = ImmutableList.of(createFullRange());

/**
* The topic name of the pulsar. It would be a full topic name, if your don't provide the tenant
* The topic name of the pulsar. It would be a full topic name. If you don't provide the tenant
* and namespace, we would add them automatically.
*/
private final String topic;

/**
* Index of partition for the topic. It would be natural number for partitioned topic with a
* non-key_shared subscription.
* Index of partition for the topic. It would be a natural number for the partitioned topic with
* a non-key_shared subscription.
*/
private final int partitionId;

Expand All @@ -69,30 +76,33 @@ public class TopicPartition implements Serializable {
*/
private final List<TopicRange> ranges;

/** Create a top-level topic without partition information. */
/**
* The key share mode for the {@link SubscriptionType#Key_Shared}. It will be {@link
* KeySharedMode#JOIN} for other subscriptions.
*/
private final KeySharedMode mode;

public TopicPartition(String topic) {
TopicName topicName = TopicName.get(topic);
this.topic = topicName.getPartitionedTopicName();
this.partitionId =
topicName.isPartitioned() ? topicName.getPartitionIndex() : NON_PARTITION_ID;
this.ranges = singletonList(createFullRange());
this(topic, extractPartitionId(topic));
}

/** Create a topic partition without key hash range. */
public TopicPartition(String topic, int partitionId) {
this(topic, partitionId, singletonList(createFullRange()));
this(topic, partitionId, FULL_RANGES, SPLIT);
}

@Internal
public TopicPartition(String topic, int partitionId, List<TopicRange> ranges) {
TopicName topicName = TopicName.get(topic);
this(topic, partitionId, ranges, SPLIT);
}

this.topic = topicName.getPartitionedTopicName();
public TopicPartition(
String topic, int partitionId, List<TopicRange> ranges, KeySharedMode mode) {
this.topic = topicName(checkNotNull(topic));
this.partitionId =
partitionId == NON_PARTITION_ID && topicName.isPartitioned()
? topicName.getPartitionIndex()
partitionId == NON_PARTITION_ID && isPartitioned(topic)
? extractPartitionId(topic)
: partitionId;
this.ranges = checkNotNull(ranges);
this.mode = mode;
}

public String getTopic() {
Expand Down Expand Up @@ -120,18 +130,24 @@ public String getFullTopicName() {
}
}

/** This method is only internal used for serialization. */
/** This method is internal used for serialization. */
@Internal
public List<TopicRange> getRanges() {
return ranges;
}

/** This method is only internal used for define key shared subscription. */
/** This method is internal used for define key shared subscription. */
@Internal
public List<Range> getPulsarRanges() {
return ranges.stream().map(TopicRange::toPulsarRange).collect(toList());
}

/** This method is internal used for key shared mode. */
@Internal
public KeySharedMode getMode() {
return mode;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -144,12 +160,13 @@ public boolean equals(Object o) {

return partitionId == partition.partitionId
&& topic.equals(partition.topic)
&& ranges.equals(partition.ranges);
&& ranges.equals(partition.ranges)
&& mode == partition.mode;
}

@Override
public int hashCode() {
return Objects.hash(topic, partitionId, ranges);
return Objects.hash(topic, partitionId, ranges, mode);
}

@Override
Expand Down
Loading