diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 8ef5665af2e07..df943f93b830f 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -586,7 +586,7 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne if (newConfig.getInputs() != null) { newConfig.getInputs().forEach((topicName -> { - newConfig.getInputSpecs().put(topicName, + newConfig.getInputSpecs().putIfAbsent(topicName, ConsumerConfig.builder().isRegexPattern(false).build()); })); } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index 676f452a84335..62d6f68d31b2c 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; import com.google.gson.Gson; +import java.util.ArrayList; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; @@ -277,6 +278,24 @@ public void testMergeDifferentInputSpec() { assertEquals(sinkConfig.getInputSpecs().get("test-input").getReceiverQueueSize().intValue(), 1000); } + @Test + public void testMergeDifferentInputSpecWithInputsSet() { + SinkConfig sinkConfig = createSinkConfig(); + sinkConfig.getInputSpecs().put("test-input", ConsumerConfig.builder().isRegexPattern(false).receiverQueueSize(1000).build()); + + Map inputSpecs = new HashMap<>(); + ConsumerConfig newConsumerConfig = ConsumerConfig.builder().isRegexPattern(false).serdeClassName("test-serde").receiverQueueSize(58).build(); + inputSpecs.put("test-input", newConsumerConfig); + SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", inputSpecs); + newSinkConfig.setInputs(new ArrayList<>()); + newSinkConfig.getInputs().add("test-input"); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals(mergedConfig.getInputSpecs().get("test-input"), newConsumerConfig); + + // make sure original sinkConfig was not modified + assertEquals(sinkConfig.getInputSpecs().get("test-input").getReceiverQueueSize().intValue(), 1000); + } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantees cannot be altered") public void testMergeDifferentProcessingGuarantees() { SinkConfig sinkConfig = createSinkConfig();