diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java index ed029d8de5f85..109e8963f743d 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java @@ -50,6 +50,9 @@ import java.util.Properties; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_ENCRYPTION_KEYS; @@ -323,6 +326,35 @@ public PulsarSinkBuilder enableTopicAutoCreation(int partitionSize) { return this; } + /** + * Configure the authentication provider to use in the Pulsar client instance. + * + * @param authPluginClassName name of the Authentication-Plugin you want to use + * @param authParamsString string which represents parameters for the Authentication-Plugin, + * e.g., "key1:val1,key2:val2" + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder setAuthentication( + String authPluginClassName, String authParamsString) { + configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName); + configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString); + return this; + } + + /** + * Configure the authentication provider to use in the Pulsar client instance. + * + * @param authPluginClassName name of the Authentication-Plugin you want to use + * @param authParams map which represents parameters for the Authentication-Plugin + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder setAuthentication( + String authPluginClassName, Map authParams) { + configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName); + configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams); + return this; + } + /** * Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be found * in {@link PulsarSinkOptions} and {@link PulsarOptions}. diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index b76e2dee3cfcc..b42e1caa885d4 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -48,11 +48,15 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; import static java.lang.Boolean.FALSE; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME; @@ -407,6 +411,35 @@ public PulsarSourceBuilder setDeserializationSchema( return self; } + /** + * Configure the authentication provider to use in the Pulsar client instance. + * + * @param authPluginClassName name of the Authentication-Plugin you want to use + * @param authParamsString string which represents parameters for the Authentication-Plugin, + * e.g., "key1:val1,key2:val2" + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder setAuthentication( + String authPluginClassName, String authParamsString) { + configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName); + configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString); + return this; + } + + /** + * Configure the authentication provider to use in the Pulsar client instance. + * + * @param authPluginClassName name of the Authentication-Plugin you want to use + * @param authParams map which represents parameters for the Authentication-Plugin + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder setAuthentication( + String authPluginClassName, Map authParams) { + configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName); + configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams); + return this; + } + /** * Sets a {@link CryptoKeyReader}. Configure the key reader to be used to decrypt the message * payloads.