Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/run-pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ jobs:
uses: actions/checkout@v2
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
- name: Set up JDK 8
uses: actions/setup-java@v2
with:
maven-version: 3.6.2
java-version: '8'
cache: 'maven'
distribution: 'zulu'
- name: Run Unit Test and Install
run: |
mvn -ntp -pl 'flink-connectors/flink-connector-pulsar,flink-connectors/flink-sql-connector-pulsar' \
clean install

mvn -ntp -pl 'flink-connectors/flink-connector-pulsar,flink-connectors/flink-sql-connector-pulsar' clean install
11 changes: 11 additions & 0 deletions docs/content.zh/docs/connectors/datastream/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,17 @@ PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "so
举个例子,如果通过 `PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0")` 来指定写入的 Topic,那么其结果等价于 `PulsarSink.builder().setTopics("some-topic1")`。
{{< /hint >}}

#### 基于消息实例的动态 Topic 指定

除了前面说的一开始就指定 Topic 或者是 Topic 分区,你还可以在程序启动后基于消息内容动态指定 Topic,只需要实现 `TopicExtractor` 接口即可。
`TopicExtractor` 接口还提供了 `TopicMetadataProvider` 用于查询某个 Topic 在 Pulsar 上有多少个分区,
查询结果会缓存并在 `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 之后失效。

`TopicExtractor` 的返回结果支持带分区信息和不带分区信息的 Topic。

1. 当返回结果里没有分区信息时,我们会查询对应的分区大小,生成所有的分区 Topic,然后传递给 `TopicRouter` 用于路由。分区信息将会被缓存 `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`。
2. 如果你的返回结果里面提供了分区信息,我们则会什么都不做,直接传递给下游。

### 序列化器

序列化器(`PulsarSerializationSchema`)负责将 Flink 中的每条记录序列化成 byte 数组,并通过网络发送至指定的写入 Topic。和 Pulsar Source 类似的是,序列化器同时支持使用基于 Flink 的 `SerializationSchema` 接口实现序列化器和使用 Pulsar 原生的 `Schema` 类型实现的序列化器。不过序列化器并不支持 Pulsar 的 `Schema.AUTO_PRODUCE_BYTES()`。
Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/connectors/datastream/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,18 @@ For example, when using the `PulsarSink.builder().setTopics("some-topic1", "some
this is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
{{< /hint >}}

#### Dynamic Topics by income messages

Topics could be defined by the message content instead of providing the fix topic set. You can dynamically
provide the topic by implementing `TopicExtractor`. The topic metadata in `TopicExtractor` can be queried
by using `TopicMetadataProvider` and the query result would be expired after we have queried for
`PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` time.

You can return two types of value in `TopicExtractor`. A topic name with or without partition information.

1. If you don't want to provide the partition, we would query the partition size and passing all the partitions to `TopicRouter`. The partition size would be cached in `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`.
2. If you provided the topic partition, we would do nothing but just pass it to downstream.

### Serializer

A serializer (`PulsarSerializationSchema`) is required for serializing the record instance into bytes.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
14 changes: 14 additions & 0 deletions flink-connectors/flink-connector-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,20 @@ under the License.
<artifactId>flink-architecture-tests-test</artifactId>
<scope>test</scope>
</dependency>

<!-- Add these two dependencies only for running docker on Github Action. Never commit to flink. -->
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>5.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
<version>5.5.0</version>
<scope>test</scope>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import org.apache.pulsar.client.api.CryptoKeyReader;
Expand Down Expand Up @@ -86,7 +86,7 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta

private final SinkConfiguration sinkConfiguration;
private final PulsarSerializationSchema<IN> serializationSchema;
private final TopicMetadataListener metadataListener;
private final TopicRegister<IN> topicRegister;
private final MessageDelayer<IN> messageDelayer;
private final TopicRouter<IN> topicRouter;

Expand All @@ -95,14 +95,14 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
PulsarSink(
SinkConfiguration sinkConfiguration,
PulsarSerializationSchema<IN> serializationSchema,
TopicMetadataListener metadataListener,
TopicRegister<IN> topicRegister,
TopicRoutingMode topicRoutingMode,
TopicRouter<IN> topicRouter,
MessageDelayer<IN> messageDelayer,
@Nullable CryptoKeyReader cryptoKeyReader) {
this.sinkConfiguration = checkNotNull(sinkConfiguration);
this.serializationSchema = checkNotNull(serializationSchema);
this.metadataListener = checkNotNull(metadataListener);
this.topicRegister = checkNotNull(topicRegister);
this.messageDelayer = checkNotNull(messageDelayer);
this.cryptoKeyReader = cryptoKeyReader;
checkNotNull(topicRoutingMode);
Expand Down Expand Up @@ -133,7 +133,7 @@ public PrecommittingSinkWriter<IN, PulsarCommittable> createWriter(InitContext i
return new PulsarWriter<>(
sinkConfiguration,
serializationSchema,
metadataListener,
topicRegister,
topicRouter,
messageDelayer,
cryptoKeyReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicExtractor;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister;
import org.apache.flink.connector.pulsar.sink.writer.topic.register.DynamicTopicRegister;
import org.apache.flink.connector.pulsar.sink.writer.topic.register.EmptyTopicRegister;
import org.apache.flink.connector.pulsar.sink.writer.topic.register.FixedTopicRegister;

import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -105,7 +109,7 @@ public class PulsarSinkBuilder<IN> {
private final PulsarConfigBuilder configBuilder;

private PulsarSerializationSchema<IN> serializationSchema;
private TopicMetadataListener metadataListener;
private TopicRegister<IN> topicRegister;
private TopicRoutingMode topicRoutingMode;
private TopicRouter<IN> topicRouter;
private MessageDelayer<IN> messageDelayer;
Expand Down Expand Up @@ -167,10 +171,26 @@ public PulsarSinkBuilder<IN> setTopics(String... topics) {
* @return this PulsarSinkBuilder.
*/
public PulsarSinkBuilder<IN> setTopics(List<String> topics) {
checkState(metadataListener == null, "setTopics couldn't be set twice.");
checkState(topicRegister == null, "setTopics couldn't be set twice.");
// Making sure the topic should be distinct.
List<String> topicSet = distinctTopics(topics);
this.metadataListener = new TopicMetadataListener(topicSet);
if (topicSet.isEmpty()) {
this.topicRegister = new EmptyTopicRegister<>();
} else {
this.topicRegister = new FixedTopicRegister<>(topicSet);
}
return this;
}

/**
* Set a dynamic topic extractor for extracting the topic information.
*
* @return this PulsarSinkBuilder.
*/
public PulsarSinkBuilder<IN> setTopics(TopicExtractor<IN> extractor) {
checkState(topicRegister == null, "setTopics couldn't be set twice.");
this.topicRegister = new DynamicTopicRegister<>(extractor);

return this;
}

Expand Down Expand Up @@ -365,14 +385,14 @@ public PulsarSink<IN> build() {
}

// Topic metadata listener validation.
if (metadataListener == null) {
if (topicRegister == null) {
if (topicRouter == null) {
throw new NullPointerException(
"No topic names or custom topic router are provided.");
} else {
LOG.warn(
"No topic set has been provided, make sure your custom topic router support empty topic set.");
this.metadataListener = new TopicMetadataListener();
this.topicRegister = new EmptyTopicRegister<>();
}
}

Expand Down Expand Up @@ -401,7 +421,7 @@ public PulsarSink<IN> build() {
return new PulsarSink<>(
sinkConfiguration,
serializationSchema,
metadataListener,
topicRegister,
topicRoutingMode,
topicRouter,
messageDelayer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
import org.apache.flink.connector.pulsar.sink.writer.topic.ProducerRegister;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
Expand Down Expand Up @@ -70,13 +70,13 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi

private final SinkConfiguration sinkConfiguration;
private final PulsarSerializationSchema<IN> serializationSchema;
private final TopicMetadataListener metadataListener;
private final TopicRegister<IN> topicRegister;
private final TopicRouter<IN> topicRouter;
private final MessageDelayer<IN> messageDelayer;
private final DeliveryGuarantee deliveryGuarantee;
private final PulsarSinkContext sinkContext;
private final MailboxExecutor mailboxExecutor;
private final TopicProducerRegister producerRegister;
private final ProducerRegister producerRegister;

private long pendingMessages = 0;

Expand All @@ -89,21 +89,21 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
*
* @param sinkConfiguration The configuration to configure the Pulsar producer.
* @param serializationSchema Transform the incoming records into different message properties.
* @param metadataListener The listener for querying topic metadata.
* @param topicRegister The listener for querying topic metadata.
* @param topicRouter Topic router to choose topic by incoming records.
* @param initContext Context to provide information about the runtime environment.
*/
public PulsarWriter(
SinkConfiguration sinkConfiguration,
PulsarSerializationSchema<IN> serializationSchema,
TopicMetadataListener metadataListener,
TopicRegister<IN> topicRegister,
TopicRouter<IN> topicRouter,
MessageDelayer<IN> messageDelayer,
@Nullable CryptoKeyReader cryptoKeyReader,
InitContext initContext) {
this.sinkConfiguration = checkNotNull(sinkConfiguration);
this.serializationSchema = checkNotNull(serializationSchema);
this.metadataListener = checkNotNull(metadataListener);
this.topicRegister = checkNotNull(topicRegister);
this.topicRouter = checkNotNull(topicRouter);
this.messageDelayer = checkNotNull(messageDelayer);
checkNotNull(initContext);
Expand All @@ -115,7 +115,7 @@ public PulsarWriter(
// Initialize topic metadata listener.
LOG.debug("Initialize topic metadata after creating Pulsar writer.");
ProcessingTimeService timeService = initContext.getProcessingTimeService();
this.metadataListener.open(sinkConfiguration, timeService);
this.topicRegister.open(sinkConfiguration, timeService);

// Initialize topic router.
this.topicRouter.open(sinkConfiguration);
Expand All @@ -130,24 +130,25 @@ public PulsarWriter(
}

// Create this producer register after opening serialization schema!
this.producerRegister = new TopicProducerRegister(sinkConfiguration, cryptoKeyReader);
this.producerRegister = new ProducerRegister(sinkConfiguration, cryptoKeyReader);
}

@Override
public void write(IN element, Context context) throws IOException, InterruptedException {
PulsarMessage<?> message = serializationSchema.serialize(element, sinkContext);

// Choose the right topic to send.
List<String> topics = topicRegister.topics(element);

List<String> availableTopics = metadataListener.availableTopics();
String keyString;
// TODO if both keyBytes and key are set, use keyBytes. This is a temporary solution.
String keyString;
if (message.getKeyBytes() == null) {
keyString = message.getKey();
} else {
keyString = Base64.getEncoder().encodeToString(message.getKeyBytes());
}
String topic = topicRouter.route(element, keyString, availableTopics, sinkContext);

String topic = topicRouter.route(element, keyString, topics, sinkContext);

// Create message builder for sending message.
TypedMessageBuilder<?> builder = createMessageBuilder(topic, context, message);
Expand Down Expand Up @@ -201,7 +202,7 @@ private void releasePermits() {
this.pendingMessages -= 1;
}

@SuppressWarnings("rawtypes")
@SuppressWarnings({"rawtypes", "unchecked"})
private TypedMessageBuilder<?> createMessageBuilder(
String topic, Context context, PulsarMessage<?> message) {

Expand Down Expand Up @@ -286,6 +287,6 @@ public Collection<PulsarCommittable> prepareCommit() {
@Override
public void close() throws Exception {
// Close all the resources and throw the exception at last.
closeAll(metadataListener, producerRegister);
closeAll(topicRegister, producerRegister);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@
* we have to create different instances for different topics.
*/
@Internal
public class TopicProducerRegister implements Closeable {
public class ProducerRegister implements Closeable {

private final PulsarClient pulsarClient;
private final SinkConfiguration sinkConfiguration;
@Nullable private final CryptoKeyReader cryptoKeyReader;
private final Map<String, Map<SchemaInfo, Producer<?>>> producerRegister;
private final Map<String, Map<SchemaInfo, Producer<?>>> register;
private final Map<String, Transaction> transactionRegister;

public TopicProducerRegister(
public ProducerRegister(
SinkConfiguration sinkConfiguration, @Nullable CryptoKeyReader cryptoKeyReader) {
this.pulsarClient = createClient(sinkConfiguration);
this.sinkConfiguration = sinkConfiguration;
this.cryptoKeyReader = cryptoKeyReader;
this.producerRegister = new HashMap<>();
this.register = new HashMap<>();
this.transactionRegister = new HashMap<>();
}

Expand Down Expand Up @@ -116,7 +116,7 @@ public List<PulsarCommittable> prepareCommit() {
* successfully persisted.
*/
public void flush() throws IOException {
Collection<Map<SchemaInfo, Producer<?>>> collection = producerRegister.values();
Collection<Map<SchemaInfo, Producer<?>>> collection = register.values();
for (Map<SchemaInfo, Producer<?>> producers : collection) {
for (Producer<?> producer : producers.values()) {
producer.flush();
Expand All @@ -134,7 +134,7 @@ public void close() throws IOException {
closer.register(this::abortTransactions);

// Remove all the producers.
closer.register(producerRegister::clear);
closer.register(register::clear);

// All the producers would be closed by this method.
// We would block until all the producers have been successfully closed.
Expand All @@ -146,7 +146,7 @@ public void close() throws IOException {
@SuppressWarnings("unchecked")
private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema) {
Map<SchemaInfo, Producer<?>> producers =
producerRegister.computeIfAbsent(topic, key -> new HashMap<>());
register.computeIfAbsent(topic, key -> new HashMap<>());
SchemaInfo schemaInfo = schema.getSchemaInfo();

if (producers.containsKey(schemaInfo)) {
Expand Down
Loading