diff --git a/.github/workflows/run-pr-check.yml b/.github/workflows/run-pr-check.yml index 54c7ef67b2a11..c4b097e3dffb0 100644 --- a/.github/workflows/run-pr-check.yml +++ b/.github/workflows/run-pr-check.yml @@ -12,12 +12,12 @@ jobs: uses: actions/checkout@v2 with: ref: ${{ github.event.pull_request.head.sha }} - - name: Set up Maven - uses: apache/pulsar-test-infra/setup-maven@master + - name: Set up JDK 8 + uses: actions/setup-java@v2 with: - maven-version: 3.6.2 + java-version: '8' + cache: 'maven' + distribution: 'zulu' - name: Run Unit Test and Install run: | - mvn -ntp -pl 'flink-connectors/flink-connector-pulsar,flink-connectors/flink-sql-connector-pulsar' \ - clean install - + mvn -ntp -pl 'flink-connectors/flink-connector-pulsar,flink-connectors/flink-sql-connector-pulsar' clean install diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md index 4c402f0e48c61..6c428f2a4166a 100644 --- a/docs/content.zh/docs/connectors/datastream/pulsar.md +++ b/docs/content.zh/docs/connectors/datastream/pulsar.md @@ -404,6 +404,17 @@ PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "so 举个例子,如果通过 `PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0")` 来指定写入的 Topic,那么其结果等价于 `PulsarSink.builder().setTopics("some-topic1")`。 {{< /hint >}} +#### 基于消息实例的动态 Topic 指定 + +除了前面说的一开始就指定 Topic 或者是 Topic 分区,你还可以在程序启动后基于消息内容动态指定 Topic,只需要实现 `TopicExtractor` 接口即可。 +`TopicExtractor` 接口还提供了 `TopicMetadataProvider` 用于查询某个 Topic 在 Pulsar 上有多少个分区, +查询结果会缓存并在 `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 之后失效。 + +`TopicExtractor` 的返回结果支持带分区信息和不带分区信息的 Topic。 + +1. 当返回结果里没有分区信息时,我们会查询对应的分区大小,生成所有的分区 Topic,然后传递给 `TopicRouter` 用于路由。分区信息将会被缓存 `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`。 +2. 如果你的返回结果里面提供了分区信息,我们则会什么都不做,直接传递给下游。 + ### 序列化器 序列化器(`PulsarSerializationSchema`)负责将 Flink 中的每条记录序列化成 byte 数组,并通过网络发送至指定的写入 Topic。和 Pulsar Source 类似的是,序列化器同时支持使用基于 Flink 的 `SerializationSchema` 接口实现序列化器和使用 Pulsar 原生的 `Schema` 类型实现的序列化器。不过序列化器并不支持 Pulsar 的 `Schema.AUTO_PRODUCE_BYTES()`。 diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md index 4d616e5f69cf0..15115c673b10b 100644 --- a/docs/content/docs/connectors/datastream/pulsar.md +++ b/docs/content/docs/connectors/datastream/pulsar.md @@ -493,6 +493,18 @@ For example, when using the `PulsarSink.builder().setTopics("some-topic1", "some this is simplified to `PulsarSink.builder().setTopics("some-topic1")`. {{< /hint >}} +#### Dynamic Topics by income messages + +Topics could be defined by the message content instead of providing the fix topic set. You can dynamically +provide the topic by implementing `TopicExtractor`. The topic metadata in `TopicExtractor` can be queried +by using `TopicMetadataProvider` and the query result would be expired after we have queried for +`PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` time. + +You can return two types of value in `TopicExtractor`. A topic name with or without partition information. + +1. If you don't want to provide the partition, we would query the partition size and passing all the partitions to `TopicRouter`. The partition size would be cached in `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`. +2. If you provided the topic partition, we would do nothing but just pass it to downstream. + ### Serializer A serializer (`PulsarSerializationSchema`) is required for serializing the record instance into bytes. diff --git a/flink-connectors/flink-connector-pulsar/archunit-violations/a2ce237e-b050-4ba0-8748-d83637a207a8 b/flink-connectors/flink-connector-pulsar/archunit-violations/a2ce237e-b050-4ba0-8748-d83637a207a8 index e69de29bb2d1d..36b34896247f5 100644 --- a/flink-connectors/flink-connector-pulsar/archunit-violations/a2ce237e-b050-4ba0-8748-d83637a207a8 +++ b/flink-connectors/flink-connector-pulsar/archunit-violations/a2ce237e-b050-4ba0-8748-d83637a207a8 @@ -0,0 +1,6 @@ +org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only one of the following predicates match:\ +* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ +* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule \ No newline at end of file diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml index 77ad83e12263d..b452d0f7c3459 100644 --- a/flink-connectors/flink-connector-pulsar/pom.xml +++ b/flink-connectors/flink-connector-pulsar/pom.xml @@ -293,6 +293,20 @@ under the License. flink-architecture-tests-test test + + + + net.java.dev.jna + jna + 5.5.0 + test + + + net.java.dev.jna + jna-platform + 5.5.0 + test + diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java index 77c1077fdff6a..d952c5f016b32 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java @@ -34,7 +34,7 @@ import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode; import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; -import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.pulsar.client.api.CryptoKeyReader; @@ -86,7 +86,7 @@ public class PulsarSink implements TwoPhaseCommittingSink serializationSchema; - private final TopicMetadataListener metadataListener; + private final TopicRegister topicRegister; private final MessageDelayer messageDelayer; private final TopicRouter topicRouter; @@ -95,14 +95,14 @@ public class PulsarSink implements TwoPhaseCommittingSink serializationSchema, - TopicMetadataListener metadataListener, + TopicRegister topicRegister, TopicRoutingMode topicRoutingMode, TopicRouter topicRouter, MessageDelayer messageDelayer, @Nullable CryptoKeyReader cryptoKeyReader) { this.sinkConfiguration = checkNotNull(sinkConfiguration); this.serializationSchema = checkNotNull(serializationSchema); - this.metadataListener = checkNotNull(metadataListener); + this.topicRegister = checkNotNull(topicRegister); this.messageDelayer = checkNotNull(messageDelayer); this.cryptoKeyReader = cryptoKeyReader; checkNotNull(topicRoutingMode); @@ -133,7 +133,7 @@ public PrecommittingSinkWriter createWriter(InitContext i return new PulsarWriter<>( sinkConfiguration, serializationSchema, - metadataListener, + topicRegister, topicRouter, messageDelayer, cryptoKeyReader, diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java index 94865e5541973..96ca03876a4fd 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java @@ -30,7 +30,11 @@ import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode; import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper; import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; -import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicExtractor; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister; +import org.apache.flink.connector.pulsar.sink.writer.topic.register.DynamicTopicRegister; +import org.apache.flink.connector.pulsar.sink.writer.topic.register.EmptyTopicRegister; +import org.apache.flink.connector.pulsar.sink.writer.topic.register.FixedTopicRegister; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.Schema; @@ -105,7 +109,7 @@ public class PulsarSinkBuilder { private final PulsarConfigBuilder configBuilder; private PulsarSerializationSchema serializationSchema; - private TopicMetadataListener metadataListener; + private TopicRegister topicRegister; private TopicRoutingMode topicRoutingMode; private TopicRouter topicRouter; private MessageDelayer messageDelayer; @@ -167,10 +171,26 @@ public PulsarSinkBuilder setTopics(String... topics) { * @return this PulsarSinkBuilder. */ public PulsarSinkBuilder setTopics(List topics) { - checkState(metadataListener == null, "setTopics couldn't be set twice."); + checkState(topicRegister == null, "setTopics couldn't be set twice."); // Making sure the topic should be distinct. List topicSet = distinctTopics(topics); - this.metadataListener = new TopicMetadataListener(topicSet); + if (topicSet.isEmpty()) { + this.topicRegister = new EmptyTopicRegister<>(); + } else { + this.topicRegister = new FixedTopicRegister<>(topicSet); + } + return this; + } + + /** + * Set a dynamic topic extractor for extracting the topic information. + * + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder setTopics(TopicExtractor extractor) { + checkState(topicRegister == null, "setTopics couldn't be set twice."); + this.topicRegister = new DynamicTopicRegister<>(extractor); + return this; } @@ -365,14 +385,14 @@ public PulsarSink build() { } // Topic metadata listener validation. - if (metadataListener == null) { + if (topicRegister == null) { if (topicRouter == null) { throw new NullPointerException( "No topic names or custom topic router are provided."); } else { LOG.warn( "No topic set has been provided, make sure your custom topic router support empty topic set."); - this.metadataListener = new TopicMetadataListener(); + this.topicRegister = new EmptyTopicRegister<>(); } } @@ -401,7 +421,7 @@ public PulsarSink build() { return new PulsarSink<>( sinkConfiguration, serializationSchema, - metadataListener, + topicRegister, topicRoutingMode, topicRouter, messageDelayer, diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java index 360f742bfd0b1..b0ce536139cb1 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java @@ -33,8 +33,8 @@ import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage; import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; -import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; -import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister; +import org.apache.flink.connector.pulsar.sink.writer.topic.ProducerRegister; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava30.com.google.common.base.Strings; @@ -70,13 +70,13 @@ public class PulsarWriter implements PrecommittingSinkWriter serializationSchema; - private final TopicMetadataListener metadataListener; + private final TopicRegister topicRegister; private final TopicRouter topicRouter; private final MessageDelayer messageDelayer; private final DeliveryGuarantee deliveryGuarantee; private final PulsarSinkContext sinkContext; private final MailboxExecutor mailboxExecutor; - private final TopicProducerRegister producerRegister; + private final ProducerRegister producerRegister; private long pendingMessages = 0; @@ -89,21 +89,21 @@ public class PulsarWriter implements PrecommittingSinkWriter serializationSchema, - TopicMetadataListener metadataListener, + TopicRegister topicRegister, TopicRouter topicRouter, MessageDelayer messageDelayer, @Nullable CryptoKeyReader cryptoKeyReader, InitContext initContext) { this.sinkConfiguration = checkNotNull(sinkConfiguration); this.serializationSchema = checkNotNull(serializationSchema); - this.metadataListener = checkNotNull(metadataListener); + this.topicRegister = checkNotNull(topicRegister); this.topicRouter = checkNotNull(topicRouter); this.messageDelayer = checkNotNull(messageDelayer); checkNotNull(initContext); @@ -115,7 +115,7 @@ public PulsarWriter( // Initialize topic metadata listener. LOG.debug("Initialize topic metadata after creating Pulsar writer."); ProcessingTimeService timeService = initContext.getProcessingTimeService(); - this.metadataListener.open(sinkConfiguration, timeService); + this.topicRegister.open(sinkConfiguration, timeService); // Initialize topic router. this.topicRouter.open(sinkConfiguration); @@ -130,7 +130,7 @@ public PulsarWriter( } // Create this producer register after opening serialization schema! - this.producerRegister = new TopicProducerRegister(sinkConfiguration, cryptoKeyReader); + this.producerRegister = new ProducerRegister(sinkConfiguration, cryptoKeyReader); } @Override @@ -138,16 +138,17 @@ public void write(IN element, Context context) throws IOException, InterruptedEx PulsarMessage message = serializationSchema.serialize(element, sinkContext); // Choose the right topic to send. + List topics = topicRegister.topics(element); - List availableTopics = metadataListener.availableTopics(); - String keyString; // TODO if both keyBytes and key are set, use keyBytes. This is a temporary solution. + String keyString; if (message.getKeyBytes() == null) { keyString = message.getKey(); } else { keyString = Base64.getEncoder().encodeToString(message.getKeyBytes()); } - String topic = topicRouter.route(element, keyString, availableTopics, sinkContext); + + String topic = topicRouter.route(element, keyString, topics, sinkContext); // Create message builder for sending message. TypedMessageBuilder builder = createMessageBuilder(topic, context, message); @@ -201,7 +202,7 @@ private void releasePermits() { this.pendingMessages -= 1; } - @SuppressWarnings("rawtypes") + @SuppressWarnings({"rawtypes", "unchecked"}) private TypedMessageBuilder createMessageBuilder( String topic, Context context, PulsarMessage message) { @@ -286,6 +287,6 @@ public Collection prepareCommit() { @Override public void close() throws Exception { // Close all the resources and throw the exception at last. - closeAll(metadataListener, producerRegister); + closeAll(topicRegister, producerRegister); } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java similarity index 94% rename from flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java rename to flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java index e83c56c77773c..d25b8ef00521b 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java @@ -59,20 +59,20 @@ * we have to create different instances for different topics. */ @Internal -public class TopicProducerRegister implements Closeable { +public class ProducerRegister implements Closeable { private final PulsarClient pulsarClient; private final SinkConfiguration sinkConfiguration; @Nullable private final CryptoKeyReader cryptoKeyReader; - private final Map>> producerRegister; + private final Map>> register; private final Map transactionRegister; - public TopicProducerRegister( + public ProducerRegister( SinkConfiguration sinkConfiguration, @Nullable CryptoKeyReader cryptoKeyReader) { this.pulsarClient = createClient(sinkConfiguration); this.sinkConfiguration = sinkConfiguration; this.cryptoKeyReader = cryptoKeyReader; - this.producerRegister = new HashMap<>(); + this.register = new HashMap<>(); this.transactionRegister = new HashMap<>(); } @@ -116,7 +116,7 @@ public List prepareCommit() { * successfully persisted. */ public void flush() throws IOException { - Collection>> collection = producerRegister.values(); + Collection>> collection = register.values(); for (Map> producers : collection) { for (Producer producer : producers.values()) { producer.flush(); @@ -134,7 +134,7 @@ public void close() throws IOException { closer.register(this::abortTransactions); // Remove all the producers. - closer.register(producerRegister::clear); + closer.register(register::clear); // All the producers would be closed by this method. // We would block until all the producers have been successfully closed. @@ -146,7 +146,7 @@ public void close() throws IOException { @SuppressWarnings("unchecked") private Producer getOrCreateProducer(String topic, Schema schema) { Map> producers = - producerRegister.computeIfAbsent(topic, key -> new HashMap<>()); + register.computeIfAbsent(topic, key -> new HashMap<>()); SchemaInfo schemaInfo = schema.getSchemaInfo(); if (producers.containsKey(schemaInfo)) { diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicExtractor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicExtractor.java new file mode 100644 index 0000000000000..4d8de8037a07a --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicExtractor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.topic; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; + +import org.apache.pulsar.client.admin.PulsarAdmin; + +import java.io.Serializable; + +/** Choose topics from the message, used for dynamic generate topics in Pulsar sink. */ +@PublicEvolving +public interface TopicExtractor extends Serializable { + + /** + * @param in The message would be written to Pulsar. + * @param provider Used for query topic metadata. + * @return The topic you want to use. You can use both partitioned topic name or a topic name + * without partition information. We would query the partition information and pass it to + * {@link TopicRouter} if you return a topic name without partition information. + */ + TopicPartition extract(IN in, TopicMetadataProvider provider); + + /** + * A wrapper for {@link PulsarAdmin} instance, we won't expose the Pulsar admin interface for + * better control the abstraction. And add cache support. + */ + @PublicEvolving + interface TopicMetadataProvider { + + /** @throws Exception Failed to query Pulsar metadata would throw this exception. */ + TopicMetadata query(String topic) throws Exception; + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicRegister.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicRegister.java new file mode 100644 index 0000000000000..925334923e72c --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicRegister.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.topic; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; + +import java.io.Closeable; +import java.io.Serializable; +import java.util.List; + +/** The topic register for returning the available topic partitions. */ +@Internal +public interface TopicRegister extends Serializable, Closeable { + + /** + * Return all the available topic partitions. We would recalculate the partitions if the topic + * metadata has been changed. Otherwise, we would return the cached result for better + * performance. + */ + List topics(IN in); + + /** Register the topic metadata update in process time service. */ + void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService); +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/DynamicTopicRegister.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/DynamicTopicRegister.java new file mode 100644 index 0000000000000..a1a3e3ae14d00 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/DynamicTopicRegister.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.topic.register; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicExtractor; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicExtractor.TopicMetadataProvider; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.singletonList; +import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The register for returning dynamic topic partitions information. */ +@Internal +public class DynamicTopicRegister implements TopicRegister { + private static final long serialVersionUID = 4374769306761301456L; + + private final TopicExtractor topicExtractor; + + // Dynamic fields. + private transient PulsarAdmin pulsarAdmin; + private transient TopicMetadataProvider metadataProvider; + private transient LoadingCache> partitionsCache; + + public DynamicTopicRegister(TopicExtractor topicExtractor) { + this.topicExtractor = checkNotNull(topicExtractor); + } + + @Override + public List topics(IN in) { + TopicPartition partition = topicExtractor.extract(in, metadataProvider); + String topicName = partition.getFullTopicName(); + + if (partition.isPartition()) { + return singletonList(topicName); + } else { + try { + return partitionsCache.get(topicName); + } catch (ExecutionException e) { + throw new FlinkRuntimeException("Failed to query Pulsar topic partitions.", e); + } + } + } + + @Override + public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService) { + long refreshInterval = sinkConfiguration.getTopicMetadataRefreshInterval(); + + // Initialize Pulsar admin instance. + this.pulsarAdmin = createAdmin(sinkConfiguration); + this.metadataProvider = new DefaultTopicMetadataProvider(pulsarAdmin, refreshInterval); + this.partitionsCache = + CacheBuilder.newBuilder() + .expireAfterWrite(refreshInterval, TimeUnit.MILLISECONDS) + .build( + new CacheLoader>() { + @Override + public List load(String topic) throws Exception { + TopicMetadata metadata = metadataProvider.query(topic); + if (metadata.isPartitioned()) { + int partitionSize = metadata.getPartitionSize(); + List partitions = + new ArrayList<>(partitionSize); + for (int i = 0; i < partitionSize; i++) { + partitions.add(topicNameWithPartition(topic, i)); + } + return partitions; + } else { + return singletonList(topic); + } + } + }); + } + + @Override + public void close() throws IOException { + if (pulsarAdmin != null) { + pulsarAdmin.close(); + } + } + + private static class DefaultTopicMetadataProvider implements TopicMetadataProvider { + + private final LoadingCache metadataCache; + + private DefaultTopicMetadataProvider(PulsarAdmin pulsarAdmin, long refreshInterval) { + this.metadataCache = + CacheBuilder.newBuilder() + .expireAfterWrite(refreshInterval, TimeUnit.MILLISECONDS) + .build( + new CacheLoader() { + @Override + public TopicMetadata load(String topic) throws Exception { + PartitionedTopicMetadata metadata = + pulsarAdmin + .topics() + .getPartitionedTopicMetadata(topic); + return new TopicMetadata(topic, metadata.partitions); + } + }); + } + + @Override + public TopicMetadata query(String topic) throws ExecutionException { + return metadataCache.get(topic); + } + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/EmptyTopicRegister.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/EmptyTopicRegister.java new file mode 100644 index 0000000000000..b350b3cac3471 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/EmptyTopicRegister.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.topic.register; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** The topic register which would do nothing for just return an empty topic partitions. */ +@Internal +public class EmptyTopicRegister implements TopicRegister { + private static final long serialVersionUID = -9199261243659491097L; + + @Override + public List topics(IN in) { + return Collections.emptyList(); + } + + @Override + public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService) { + // Nothing to do. + } + + @Override + public void close() throws IOException { + // Nothing to do. + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/FixedTopicRegister.java similarity index 89% rename from flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java rename to flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/FixedTopicRegister.java index 040438f902349..ec0d45a4919de 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/FixedTopicRegister.java @@ -16,11 +16,12 @@ * limitations under the License. */ -package org.apache.flink.connector.pulsar.sink.writer.topic; +package org.apache.flink.connector.pulsar.sink.writer.topic.register; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister; import org.apache.flink.shaded.guava30.com.google.common.base.Objects; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; @@ -31,15 +32,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import static java.util.Collections.emptyList; import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin; import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartitioned; @@ -53,10 +51,10 @@ * built-in logic. We use Flink's ProcessingTimer as the executor. */ @Internal -public class TopicMetadataListener implements Serializable, Closeable { +public class FixedTopicRegister implements TopicRegister { private static final long serialVersionUID = 6186948471557507522L; - private static final Logger LOG = LoggerFactory.getLogger(TopicMetadataListener.class); + private static final Logger LOG = LoggerFactory.getLogger(FixedTopicRegister.class); private final ImmutableList partitionedTopics; private final Map topicMetadata; @@ -67,11 +65,7 @@ public class TopicMetadataListener implements Serializable, Closeable { private transient Long topicMetadataRefreshInterval; private transient ProcessingTimeService timeService; - public TopicMetadataListener() { - this(emptyList()); - } - - public TopicMetadataListener(List topics) { + public FixedTopicRegister(List topics) { List partitions = new ArrayList<>(topics.size()); Map metadata = new HashMap<>(topics.size()); for (String topic : topics) { @@ -88,7 +82,7 @@ public TopicMetadataListener(List topics) { this.availableTopics = ImmutableList.of(); } - /** Register the topic metadata update in process time service. */ + @Override public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService) { if (topicMetadata.isEmpty()) { LOG.info("No topics have been provided, skip listener initialize."); @@ -107,12 +101,8 @@ public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService time triggerNextTopicMetadataUpdate(true); } - /** - * Return all the available topic partitions. We would recalculate the partitions if the topic - * metadata has been changed. Otherwise, we would return the cached result for better - * performance. - */ - public List availableTopics() { + @Override + public List topics(IN in) { if (availableTopics.isEmpty() && (!partitionedTopics.isEmpty() || !topicMetadata.isEmpty())) { List results = new ArrayList<>(); diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java index e46e1906d63f5..769585182e254 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java @@ -73,6 +73,7 @@ public String toString() { /** * The position type for reader to choose whether timestamp or message id as the start position. */ + @PublicEvolving public enum Type { TIMESTAMP, diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java index 57d5f02ed24c0..f9ff0618215b8 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java @@ -27,11 +27,12 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import java.util.ArrayList; import java.util.List; -import java.util.stream.IntStream; import static java.util.stream.Collectors.toList; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition.NON_PARTITION_ID; /** PulsarSubscriber abstract class to simplify Pulsar admin related operations. */ public abstract class BasePulsarSubscriber implements PulsarSubscriber { @@ -60,21 +61,18 @@ protected List toTopicPartitions( if (!metadata.isPartitioned()) { // For non-partitioned topic. return ranges.stream() - .map(range -> new TopicPartition(metadata.getName(), -1, range)) + .map(range -> new TopicPartition(metadata.getName(), NON_PARTITION_ID, range)) .collect(toList()); } else { - return IntStream.range(0, metadata.getPartitionSize()) - .boxed() - .flatMap( - partitionId -> - ranges.stream() - .map( - range -> - new TopicPartition( - metadata.getName(), - partitionId, - range))) - .collect(toList()); + List partitions = new ArrayList<>(); + for (int i = 0; i < metadata.getPartitionSize(); i++) { + for (TopicRange range : ranges) { + TopicPartition partition = new TopicPartition(metadata.getName(), i, range); + partitions.add(partition); + } + } + + return partitions; } } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java index b3035cde8485e..64901e24d303c 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java @@ -29,6 +29,7 @@ import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -39,6 +40,12 @@ public class TopicPartition implements Serializable { private static final long serialVersionUID = -1474354741550810953L; + /** + * If {@link TopicPartition#getPartitionId()} is equal to this. This topic partition wouldn't be + * a partition instance. It would be a top topic name. + */ + public static final int NON_PARTITION_ID = -1; + /** * The topic name of the pulsar. It would be a full topic name, if your don't provide the tenant * and namespace, we would add them automatically. @@ -58,6 +65,16 @@ public class TopicPartition implements Serializable { */ private final TopicRange range; + /** Create a top level topic without partition information. */ + public TopicPartition(String topic) { + this(topic, NON_PARTITION_ID); + } + + /** Create a topic partition without key hash range. */ + public TopicPartition(String topic, int partitionId) { + this(topic, partitionId, createFullRange()); + } + public TopicPartition(String topic, int partitionId, TopicRange range) { this.topic = topicName(checkNotNull(topic)); this.partitionId = partitionId; @@ -72,12 +89,16 @@ public int getPartitionId() { return partitionId; } + public boolean isPartition() { + return partitionId != NON_PARTITION_ID; + } + /** * Pulsar split the topic partition into a bunch of small topics, we would get the real topic * name by using this method. */ public String getFullTopicName() { - if (partitionId >= 0) { + if (isPartition()) { return topicNameWithPartition(topic, partitionId); } else { return topic; diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java index 1508b8732a311..09d4882ffe9da 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java @@ -48,6 +48,9 @@ public class TopicRange implements Serializable { /** The end position for hash range, it's 65535. */ public static final int MAX_RANGE = RANGE_SIZE - 1; + /** A full topic range instance for avoiding multiple instance creation. */ + private static final TopicRange FULL_RANGE = new TopicRange(MIN_RANGE, MAX_RANGE); + /** The start of the range, default is zero. */ private final int start; @@ -70,7 +73,7 @@ public Range toPulsarRange() { /** Create a topic range which contains the full hash range. */ public static TopicRange createFullRange() { - return new TopicRange(MIN_RANGE, MAX_RANGE); + return FULL_RANGE; } public int getStart() { diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java index f71facd994aa6..9fc272a7338c8 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java @@ -31,7 +31,7 @@ import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer; import org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter; import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; -import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; +import org.apache.flink.connector.pulsar.sink.writer.topic.register.FixedTopicRegister; import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorIOMetricGroup; @@ -80,7 +80,7 @@ void writeMessageWithoutGuarantee(DeliveryGuarantee guarantee) throws Exception SinkConfiguration configuration = sinkConfiguration(guarantee); PulsarSerializationSchema schema = pulsarSchema(STRING); - TopicMetadataListener listener = new TopicMetadataListener(singletonList(topic)); + FixedTopicRegister listener = new FixedTopicRegister(singletonList(topic)); RoundRobinTopicRouter router = new RoundRobinTopicRouter<>(configuration); FixedMessageDelayer delayer = MessageDelayer.never(); MockInitContext initContext = new MockInitContext(); diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegisterTest.java similarity index 92% rename from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegisterTest.java index fa4a1f5109ebd..135ce8779b3f3 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegisterTest.java @@ -38,8 +38,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -/** Unit tests for {@link TopicProducerRegister}. */ -class TopicProducerRegisterTest extends PulsarTestSuiteBase { +/** Unit tests for {@link ProducerRegister}. */ +class ProducerRegisterTest extends PulsarTestSuiteBase { @ParameterizedTest @EnumSource(DeliveryGuarantee.class) @@ -49,7 +49,7 @@ void createMessageBuilderForSendingMessage(DeliveryGuarantee deliveryGuarantee) operator().createTopic(topic, 8); SinkConfiguration configuration = sinkConfiguration(deliveryGuarantee); - TopicProducerRegister register = new TopicProducerRegister(configuration, null); + ProducerRegister register = new ProducerRegister(configuration, null); String message = randomAlphabetic(10); register.createMessageBuilder(topic, Schema.STRING).value(message).send(); @@ -76,7 +76,7 @@ void noneAndAtLeastOnceWouldNotCreateTransaction(DeliveryGuarantee deliveryGuara operator().createTopic(topic, 8); SinkConfiguration configuration = sinkConfiguration(deliveryGuarantee); - TopicProducerRegister register = new TopicProducerRegister(configuration, null); + ProducerRegister register = new ProducerRegister(configuration, null); String message = randomAlphabetic(10); register.createMessageBuilder(topic, Schema.STRING).value(message).sendAsync(); diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/DynamicTopicRegisterTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/DynamicTopicRegisterTest.java new file mode 100644 index 0000000000000..c5290d6bf78a8 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/DynamicTopicRegisterTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.topic.register; + +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicExtractor; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; + +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** Unit tests for {@link DynamicTopicRegister}. */ +class DynamicTopicRegisterTest extends PulsarTestSuiteBase { + + private final MockTopicExtractor extractor = new MockTopicExtractor(); + + private static final class MockTopicExtractor implements TopicExtractor { + private static final long serialVersionUID = 2456172645787498006L; + + private TopicPartition partition; + + @Override + public TopicPartition extract(String s, TopicMetadataProvider provider) { + return partition; + } + + public void setPartition(TopicPartition partition) { + this.partition = partition; + } + } + + @Test + void partitionedTopicWouldBeReturnedDirectly() throws IOException { + DynamicTopicRegister register = topicRegister(50000); + TopicPartition partition = new TopicPartition("some", 1); + extractor.setPartition(partition); + List topics = register.topics(randomAlphabetic(10)); + + assertThat(topics) + .hasSize(1) + .allSatisfy(topic -> assertThat(topic).isEqualTo(partition.getFullTopicName())); + + register.close(); + } + + @Test + void rootTopicWillReturnAllThePartitions() throws IOException { + DynamicTopicRegister register = topicRegister(50000); + TopicPartition partition = new TopicPartition("root-topic" + randomAlphabetic(10)); + extractor.setPartition(partition); + operator().createTopic(partition.getFullTopicName(), 10); + List topics = register.topics(randomAlphabetic(10)); + + assertThat(topics) + .hasSize(10) + .allSatisfy(topic -> assertThat(topic).startsWith(partition.getTopic())); + + register.close(); + } + + private DynamicTopicRegister topicRegister(long interval) { + DynamicTopicRegister register = new DynamicTopicRegister<>(extractor); + register.open(sinkConfiguration(interval), mock(ProcessingTimeService.class)); + + return register; + } + + private SinkConfiguration sinkConfiguration(long interval) { + Configuration configuration = operator().config(); + configuration.set(PULSAR_TOPIC_METADATA_REFRESH_INTERVAL, interval); + + return new SinkConfiguration(configuration); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/FixedTopicRegisterTest.java similarity index 78% rename from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/FixedTopicRegisterTest.java index b34d229c8feac..b0ee0c5ddc7cc 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/register/FixedTopicRegisterTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.pulsar.sink.writer.topic; +package org.apache.flink.connector.pulsar.sink.writer.topic.register; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; @@ -25,11 +25,13 @@ import org.junit.jupiter.api.Test; +import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.stream.IntStream; +import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; @@ -39,21 +41,23 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -/** Unit tests for {@link TopicMetadataListener}. */ -class TopicMetadataListenerTest extends PulsarTestSuiteBase { +/** Unit tests for {@link FixedTopicRegister}. */ +class FixedTopicRegisterTest extends PulsarTestSuiteBase { @Test - void listenEmptyTopics() { - TopicMetadataListener listener = new TopicMetadataListener(); + void listenEmptyTopics() throws IOException { + FixedTopicRegister listener = new FixedTopicRegister<>(emptyList()); SinkConfiguration configuration = sinkConfiguration(Duration.ofMinutes(5).toMillis()); TestProcessingTimeService timeService = new TestProcessingTimeService(); - List topics = listener.availableTopics(); + List topics = listener.topics(""); assertThat(topics).isEmpty(); listener.open(configuration, timeService); - topics = listener.availableTopics(); + topics = listener.topics(""); assertThat(topics).isEmpty(); + + listener.close(); } @Test @@ -62,41 +66,45 @@ void listenOnPartitions() throws Exception { operator().createTopic(topic, 6); List partitions = topicPartitions(topic, 6); - TopicMetadataListener listener = new TopicMetadataListener(partitions); + FixedTopicRegister listener = new FixedTopicRegister<>(partitions); long interval = Duration.ofMinutes(15).toMillis(); SinkConfiguration configuration = sinkConfiguration(interval); TestProcessingTimeService timeService = new TestProcessingTimeService(); - List topics = listener.availableTopics(); + List topics = listener.topics(""); assertEquals(topics, partitions); listener.open(configuration, timeService); - topics = listener.availableTopics(); + topics = listener.topics(""); assertEquals(topics, partitions); operator().increaseTopicPartitions(topic, 12); timeService.advance(interval); - topics = listener.availableTopics(); + topics = listener.topics(""); assertEquals(topics, partitions); + + listener.close(); } @Test - void fetchTopicPartitionInformation() { + void fetchTopicPartitionInformation() throws IOException { String topic = randomAlphabetic(10); operator().createTopic(topic, 8); - TopicMetadataListener listener = new TopicMetadataListener(singletonList(topic)); + FixedTopicRegister listener = new FixedTopicRegister<>(singletonList(topic)); SinkConfiguration configuration = sinkConfiguration(Duration.ofMinutes(10).toMillis()); TestProcessingTimeService timeService = new TestProcessingTimeService(); - List topics = listener.availableTopics(); + List topics = listener.topics(""); assertThat(topics).isEmpty(); listener.open(configuration, timeService); - topics = listener.availableTopics(); + topics = listener.topics(""); List desiredTopics = topicPartitions(topic, 8); assertThat(topics).hasSize(8).isEqualTo(desiredTopics); + + listener.close(); } @Test @@ -106,13 +114,13 @@ void fetchTopicPartitionUpdate() throws Exception { long interval = Duration.ofMinutes(20).toMillis(); - TopicMetadataListener listener = new TopicMetadataListener(singletonList(topic)); + FixedTopicRegister listener = new FixedTopicRegister<>(singletonList(topic)); SinkConfiguration configuration = sinkConfiguration(interval); TestProcessingTimeService timeService = new TestProcessingTimeService(); timeService.setCurrentTime(System.currentTimeMillis()); listener.open(configuration, timeService); - List topics = listener.availableTopics(); + List topics = listener.topics(""); List desiredTopics = topicPartitions(topic, 8); assertThat(topics).isEqualTo(desiredTopics); @@ -121,25 +129,29 @@ void fetchTopicPartitionUpdate() throws Exception { operator().increaseTopicPartitions(topic, 16); timeService.advance(interval); - topics = listener.availableTopics(); + topics = listener.topics(""); desiredTopics = topicPartitions(topic, 16); assertThat(topics).isEqualTo(desiredTopics); + + listener.close(); } @Test - void fetchNonPartitionTopic() { + void fetchNonPartitionTopic() throws IOException { String topic = randomAlphabetic(10); operator().createTopic(topic, 0); List nonPartitionTopic = Collections.singletonList(topicName(topic)); - TopicMetadataListener listener = new TopicMetadataListener(nonPartitionTopic); + FixedTopicRegister listener = new FixedTopicRegister<>(nonPartitionTopic); long interval = Duration.ofMinutes(15).toMillis(); SinkConfiguration configuration = sinkConfiguration(interval); TestProcessingTimeService timeService = new TestProcessingTimeService(); listener.open(configuration, timeService); - List topics = listener.availableTopics(); + List topics = listener.topics(""); assertEquals(topics, nonPartitionTopic); + + listener.close(); } private List topicPartitions(String topic, int partitionSize) { diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java index 9a1aed78b3c76..2a2d4d34ecdfc 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java @@ -56,9 +56,6 @@ public class PulsarContainerRuntime implements PulsarRuntime { private static final String PULSAR_ADMIN_URL = String.format("http://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_HTTP_PORT); - private static final String TXN_CONFIG_FILE = "containers/txnStandalone.conf"; - private static final String AUTH_CONFIG_FILE = "containers/authStandalone.conf"; - /** * Create a pulsar container provider by a predefined version, this constance {@link * DockerImageVersions#PULSAR} should be bumped after the new pulsar release. @@ -69,14 +66,9 @@ public class PulsarContainerRuntime implements PulsarRuntime { private boolean boundFlink = false; private PulsarRuntimeOperator operator; - private String configFile; public PulsarContainerRuntime(boolean authEnabled) { - if (authEnabled) { - configFile = AUTH_CONFIG_FILE; - } else { - configFile = TXN_CONFIG_FILE; - } + // TODO Add authentication support. } public PulsarContainerRuntime bindWithFlinkContainer(GenericContainer flinkContainer) { @@ -110,7 +102,7 @@ public void startUp() { .forStatusCode(200) .withStartupTimeout(Duration.ofMinutes(5))); // Set custom startup script. - container.withCommand("/pulsar/bin/bootstrap.sh"); + container.withCommand("sh /pulsar/bin/bootstrap.sh"); // Start the Pulsar Container. container.start();