From bd7bd57ecae3a17f2975fdb609558d49c8804e11 Mon Sep 17 00:00:00 2001 From: Yufan Sheng Date: Wed, 20 Jul 2022 15:29:24 +0800 Subject: [PATCH] [FLINK-26931][Connector/Pulsar] Make the producer name and consumer name unique for each instance. --- .../common/config/PulsarClientFactory.java | 18 ++++++++++-------- .../common/config/PulsarConfiguration.java | 2 +- .../pulsar/sink/PulsarSinkBuilder.java | 5 +++++ .../sink/config/PulsarSinkConfigUtils.java | 6 +++++- .../pulsar/source/PulsarSourceBuilder.java | 7 ++++++- .../source/config/PulsarSourceConfigUtils.java | 6 +++++- 6 files changed, 32 insertions(+), 12 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java index b1214b57c04f6..1f01b242143ca 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java @@ -153,7 +153,7 @@ public static PulsarClient createClient(PulsarConfiguration configuration) { /** * PulsarAdmin shares almost the same configuration with PulsarClient, but we separate this - * create method for directly creating it. + * creating method for directly use it. */ public static PulsarAdmin createAdmin(PulsarConfiguration configuration) { PulsarAdminBuilder builder = PulsarAdmin.builder(); @@ -200,15 +200,17 @@ private static Authentication createAuthentication(PulsarConfiguration configura String authParamsString = configuration.get(PULSAR_AUTH_PARAMS); return sneakyClient( () -> AuthenticationFactory.create(authPluginClassName, authParamsString)); - } else if (configuration.contains(PULSAR_AUTH_PARAM_MAP)) { - Map paramsMap = configuration.get(PULSAR_AUTH_PARAM_MAP); + } else { + Map paramsMap = configuration.getProperties(PULSAR_AUTH_PARAM_MAP); + if (paramsMap.isEmpty()) { + throw new IllegalArgumentException( + String.format( + "No %s or %s provided", + PULSAR_AUTH_PARAMS.key(), PULSAR_AUTH_PARAM_MAP.key())); + } + return sneakyClient( () -> AuthenticationFactory.create(authPluginClassName, paramsMap)); - } else { - throw new IllegalArgumentException( - String.format( - "No %s or %s provided", - PULSAR_AUTH_PARAMS.key(), PULSAR_AUTH_PARAM_MAP.key())); } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java index 3e64c6643cd62..0681a3ef4cba4 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java @@ -76,7 +76,7 @@ public Map getProperties(ConfigOption> optio return properties; } - /** Get an option value from the given config, convert it into the a new value instance. */ + /** Get an option value from the given config, convert it into a new value instance. */ public T get(ConfigOption option, Function convertor) { F value = get(option); if (value != null) { diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java index 96ca03876a4fd..e623f29406725 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java @@ -373,6 +373,11 @@ public PulsarSink build() { if (!configBuilder.contains(PULSAR_PRODUCER_NAME)) { LOG.warn( "We recommend set a readable producer name through setProducerName(String) in production mode."); + } else { + String producerName = configBuilder.get(PULSAR_PRODUCER_NAME); + if (!producerName.contains("%s")) { + configBuilder.override(PULSAR_PRODUCER_NAME, producerName + " - %s"); + } } checkNotNull(serializationSchema, "serializationSchema must be set."); diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java index 77155bbea6b74..e4c48162589f3 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.Schema; import java.util.Map; +import java.util.UUID; import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -77,7 +78,10 @@ public static ProducerBuilder createProducerBuilder( PulsarClient client, Schema schema, SinkConfiguration configuration) { ProducerBuilder builder = client.newProducer(schema); - configuration.useOption(PULSAR_PRODUCER_NAME, builder::producerName); + configuration.useOption( + PULSAR_PRODUCER_NAME, + producerName -> String.format(producerName, UUID.randomUUID()), + builder::producerName); configuration.useOption( PULSAR_SEND_TIMEOUT_MS, Math::toIntExact, diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index 37dd01ee946d0..7e6842a76cdc9 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -515,9 +515,14 @@ public PulsarSource build() { if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) { LOG.warn( "We recommend set a readable consumer name through setConsumerName(String) in production mode."); + } else { + String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME); + if (!consumerName.contains("%s")) { + configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + " - %s"); + } } - // Since these implementation could be a lambda, make sure they are serializable. + // Since these implementations could be a lambda, make sure they are serializable. checkState(isSerializable(startCursor), "StartCursor isn't serializable"); checkState(isSerializable(stopCursor), "StopCursor isn't serializable"); checkState(isSerializable(rangeGenerator), "RangeGenerator isn't serializable"); diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java index adb8a03df0328..602a1577938e1 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Optional; +import java.util.UUID; import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -106,7 +107,10 @@ public static ConsumerBuilder createConsumerBuilder( configuration.useOption( PULSAR_MAX_TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS, builder::maxTotalReceiverQueueSizeAcrossPartitions); - configuration.useOption(PULSAR_CONSUMER_NAME, builder::consumerName); + configuration.useOption( + PULSAR_CONSUMER_NAME, + consumerName -> String.format(consumerName, UUID.randomUUID()), + builder::consumerName); configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted); configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel); configuration.useOption(