From 386513fcc886cf4432fdbc179ae7822d411c2f45 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 16 Aug 2023 15:15:59 +0800 Subject: [PATCH 1/2] [improve][io] Improve kinesis connector config. --- .../kinesis/AwsCredentialProviderPlugin.java | 29 ---------- .../AwsDefaultProviderChainPlugin.java | 30 ---------- .../pulsar/io/kinesis/BaseKinesisConfig.java | 7 +++ .../apache/pulsar/io/kinesis/KinesisSink.java | 17 +++--- .../pulsar/io/kinesis/KinesisSinkConfig.java | 18 +++--- .../pulsar/io/kinesis/KinesisSource.java | 17 +----- .../io/kinesis/KinesisSourceConfig.java | 36 ++++++------ .../kinesis/STSAssumeRoleProviderPlugin.java | 31 ---------- .../io/kinesis/KinesisSinkConfigTests.java | 34 ++++------- .../io/kinesis/KinesisSourceConfigTests.java | 57 ++++++------------- .../src/test/resources/sinkConfig.yaml | 27 --------- .../src/test/resources/sourceConfig.yaml | 32 ----------- 12 files changed, 73 insertions(+), 262 deletions(-) delete mode 100644 pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java delete mode 100644 pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java delete mode 100644 pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java delete mode 100644 pulsar-io/kinesis/src/test/resources/sinkConfig.yaml delete mode 100644 pulsar-io/kinesis/src/test/resources/sourceConfig.yaml diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java deleted file mode 100644 index e88a952293b4f..0000000000000 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.kinesis; - -/** - * This is a stub class for backwards compatibility. In new code and configurations, please use the plugins - * from org.apache.pulsar.io.aws - * - * @see org.apache.pulsar.io.aws.AwsCredentialProviderPlugin - */ -@Deprecated -public interface AwsCredentialProviderPlugin extends org.apache.pulsar.io.aws.AwsCredentialProviderPlugin { -} diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java deleted file mode 100644 index 75952a71a29a0..0000000000000 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.kinesis; - -/** - * This is a stub class for backwards compatibility. In new code and configurations, please use the plugins - * from org.apache.pulsar.io.aws - * - * @see org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin - */ -@Deprecated -public class AwsDefaultProviderChainPlugin extends org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin - implements AwsCredentialProviderPlugin { -} diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java index c9c951ae2b70e..7bd95b0d6e3ab 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java @@ -35,6 +35,13 @@ public abstract class BaseKinesisConfig implements Serializable { ) private String awsEndpoint = ""; + @FieldDoc( + required = false, + defaultValue = "", + help = "Cloudwatch end-point url. It can be found at " + + "https://docs.aws.amazon.com/general/latest/gr/rande.html" + ) + private String cloudwatchEndpoint = ""; @FieldDoc( required = false, diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index fb8eedff82f0a..d8e4e4bab85e5 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.io.kinesis; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.addCallback; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -49,7 +48,6 @@ import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.aws.AbstractAwsConnector; import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin; -import org.apache.pulsar.io.common.IOConfigUtils; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.Connector; @@ -155,17 +153,16 @@ public void close() { @Override public void open(Map config, SinkContext sinkContext) { scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); - kinesisSinkConfig = IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, sinkContext); + kinesisSinkConfig = KinesisSinkConfig.load(config, sinkContext); this.sinkContext = sinkContext; - checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), "empty kinesis-stream name"); - checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint()) - || isNotBlank(kinesisSinkConfig.getAwsRegion()), - "Either the aws-end-point or aws-region must be set"); - checkArgument(isNotBlank(kinesisSinkConfig.getAwsCredentialPluginParam()), "empty aws-credential param"); - KinesisProducerConfiguration kinesisConfig = new KinesisProducerConfiguration(); - kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint()); + if (isNotBlank(kinesisSinkConfig.getAwsEndpoint())) { + kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint()); + } + if (isNotBlank(kinesisSinkConfig.getCloudwatchEndpoint())) { + kinesisConfig.setCloudwatchEndpoint(kinesisSinkConfig.getCloudwatchEndpoint()); + } if (kinesisSinkConfig.getAwsEndpointPort() != null) { kinesisConfig.setKinesisPort(kinesisSinkConfig.getAwsEndpointPort()); } diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java index c5b26a26d0cf2..0905d211d4d23 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java @@ -18,13 +18,14 @@ */ package org.apache.pulsar.io.kinesis; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import java.io.File; -import java.io.IOException; +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.io.Serializable; +import java.util.Map; import lombok.Data; import lombok.EqualsAndHashCode; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @Data @@ -103,9 +104,12 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable help = "The maximum delay(in milliseconds) between retries.") private long retryMaxDelayInMillis = 60000; - public static KinesisSinkConfig load(String yamlFile) throws IOException { - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - return mapper.readValue(new File(yamlFile), KinesisSinkConfig.class); + public static KinesisSinkConfig load(Map config, SinkContext sinkContext) { + KinesisSinkConfig kinesisSinkConfig = IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, sinkContext); + checkArgument(isNotBlank(kinesisSinkConfig.getAwsRegion()) + || (isNotBlank(kinesisSinkConfig.getAwsEndpoint()) && isNotBlank(kinesisSinkConfig.getCloudwatchEndpoint())), + "Either the awsEndpoint, cloudwatchEndpoint or awsRegion must be set"); + return kinesisSinkConfig; } public enum MessageFormat { diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java index 2412244e1b5dc..279368db2a028 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.io.kinesis; -import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.net.InetAddress; import java.util.Map; import java.util.UUID; @@ -27,14 +25,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.io.aws.AbstractAwsConnector; import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin; -import org.apache.pulsar.io.common.IOConfigUtils; import org.apache.pulsar.io.core.Source; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; -import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.retrieval.RetrievalConfig; @@ -68,18 +64,7 @@ public void close() throws Exception { @Override public void open(Map config, SourceContext sourceContext) throws Exception { - this.kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config, KinesisSourceConfig.class, sourceContext); - - checkArgument(isNotBlank(kinesisSourceConfig.getAwsKinesisStreamName()), "empty kinesis-stream name"); - checkArgument(isNotBlank(kinesisSourceConfig.getAwsEndpoint()) - || isNotBlank(kinesisSourceConfig.getAwsRegion()), - "Either the aws-end-point or aws-region must be set"); - checkArgument(isNotBlank(kinesisSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param"); - - if (kinesisSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) { - checkArgument((kinesisSourceConfig.getStartAtTime() != null), "Timestamp must be specified"); - } - + this.kinesisSourceConfig = KinesisSourceConfig.load(config, sourceContext); queue = new LinkedBlockingQueue<>(kinesisSourceConfig.getReceiveQueueSize()); workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java index f0bf7cfc9781d..a94658a035731 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java @@ -18,16 +18,17 @@ */ package org.apache.pulsar.io.kinesis; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import java.io.File; -import java.io.IOException; +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.io.Serializable; import java.net.URI; import java.util.Date; +import java.util.Map; import lombok.Data; import lombok.EqualsAndHashCode; import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder; @@ -76,7 +77,7 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab @FieldDoc( required = false, - defaultValue = "Apache Pulsar IO Connector", + defaultValue = "pulsar-kinesis", help = "Name of the Amazon Kinesis application. By default the application name is included " + "in the user agent string used to make AWS requests. This can assist with troubleshooting " + "(e.g. distinguish requests made by separate connectors instances)." @@ -122,13 +123,6 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab ) private String dynamoEndpoint = ""; - @FieldDoc( - required = false, - defaultValue = "", - help = "Cloudwatch end-point url. It can be found at https://docs.aws.amazon.com/general/latest/gr/rande.html" - ) - private String cloudwatchEndpoint = ""; - @FieldDoc( required = false, defaultValue = "true", @@ -136,10 +130,20 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab ) private boolean useEnhancedFanOut = true; - - public static KinesisSourceConfig load(String yamlFile) throws IOException { - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - return mapper.readValue(new File(yamlFile), KinesisSourceConfig.class); + public static KinesisSourceConfig load(Map config, SourceContext sourceContext) { + KinesisSourceConfig kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config, + KinesisSourceConfig.class, sourceContext); + checkArgument(isNotBlank(kinesisSourceConfig.getAwsRegion()) + || (isNotBlank(kinesisSourceConfig.getAwsEndpoint()) + && isNotBlank(kinesisSourceConfig.getCloudwatchEndpoint()) + && isNotBlank(kinesisSourceConfig.getDynamoEndpoint())), + "Either the awsEndpoint, cloudwatchEndpoint, dynamoEndpoint or awsRegion must be set"); + checkArgument(isNotBlank(kinesisSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param"); + if (kinesisSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) { + checkArgument((kinesisSourceConfig.getStartAtTime() != null), + "When initialPositionInStream is AT_TIMESTAMP, startAtTime must be specified"); + } + return kinesisSourceConfig; } public KinesisAsyncClient buildKinesisAsyncClient(AwsCredentialProviderPlugin credPlugin) { diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java deleted file mode 100644 index e305c9c9b9fe2..0000000000000 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.kinesis; - -/** - * This is a stub class for backwards compatibility. In new code and configurations, please use the plugins - * from org.apache.pulsar.io.aws - * - * @see org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin - */ -@Deprecated -public class STSAssumeRoleProviderPlugin extends org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin - implements AwsCredentialProviderPlugin { -} - diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java index 6f76d9e69a211..d23c26a3a95fa 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java @@ -21,34 +21,16 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.apache.pulsar.io.common.IOConfigUtils; import org.apache.pulsar.io.core.SinkContext; -import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat; import org.mockito.Mockito; import org.testng.annotations.Test; public class KinesisSinkConfigTests { - @Test - public final void loadFromYamlFileTest() throws IOException { - File yamlFile = getFile("sinkConfig.yaml"); - KinesisSinkConfig config = KinesisSinkConfig.load(yamlFile.getAbsolutePath()); - - assertNotNull(config); - assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws"); - assertEquals(config.getAwsRegion(), "us-east-1"); - assertEquals(config.getAwsKinesisStreamName(), "my-stream"); - assertEquals(config.getAwsCredentialPluginParam(), - "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); - assertEquals(config.getMessageFormat(), MessageFormat.ONLY_RAW_PAYLOAD); - assertEquals(true, config.isRetainOrdering()); - } - @Test public final void loadFromMapTest() throws IOException { Map map = new HashMap (); @@ -58,7 +40,7 @@ public final void loadFromMapTest() throws IOException { map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); SinkContext sinkContext = Mockito.mock(SinkContext.class); - KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSinkConfig.class, sinkContext); + KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext); assertNotNull(config); assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws"); @@ -78,7 +60,7 @@ public final void loadFromMapCredentialFromSecretTest() throws IOException { SinkContext sinkContext = Mockito.mock(SinkContext.class); Mockito.when(sinkContext.getSecret("awsCredentialPluginParam")) .thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); - KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSinkConfig.class, sinkContext); + KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext); assertNotNull(config); assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws"); @@ -88,8 +70,14 @@ public final void loadFromMapCredentialFromSecretTest() throws IOException { "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); } - private File getFile(String name) { - ClassLoader classLoader = getClass().getClassLoader(); - return new File(classLoader.getResource(name).getFile()); + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Either the awsEndpoint, cloudwatchEndpoint or awsRegion must be set") + public final void missCloudWatchEndpointTest() { + Map map = new HashMap (); + map.put("awsEndpoint", "https://some.endpoint.aws"); + map.put("awsKinesisStreamName", "my-stream"); + map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + KinesisSinkConfig.load(map, sinkContext); } } diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java index f6b0666d34ba3..23dbf71908c0c 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java @@ -21,7 +21,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import java.io.File; import java.io.IOException; import java.time.ZoneOffset; import java.time.ZonedDateTime; @@ -30,7 +29,6 @@ import java.util.HashMap; import java.util.Map; -import org.apache.pulsar.io.common.IOConfigUtils; import org.apache.pulsar.io.core.SourceContext; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -54,30 +52,6 @@ public class KinesisSourceConfigTests { DAY = then.getTime(); } - @Test - public final void loadFromYamlFileTest() throws IOException { - File yamlFile = getFile("sourceConfig.yaml"); - KinesisSourceConfig config = KinesisSourceConfig.load(yamlFile.getAbsolutePath()); - assertNotNull(config); - assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws"); - assertEquals(config.getAwsRegion(), "us-east-1"); - assertEquals(config.getAwsKinesisStreamName(), "my-stream"); - assertEquals(config.getAwsCredentialPluginParam(), - "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); - assertEquals(config.getApplicationName(), "My test application"); - assertEquals(config.getCheckpointInterval(), 30000); - assertEquals(config.getBackoffTime(), 4000); - assertEquals(config.getNumRetries(), 3); - assertEquals(config.getReceiveQueueSize(), 2000); - assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON); - - Calendar cal = Calendar.getInstance(); - cal.setTime(config.getStartAtTime()); - ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(), ZoneOffset.UTC); - ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), ZoneOffset.UTC); - assertEquals(actual, expected); - } - @Test public final void loadFromMapTest() throws IOException { Map map = new HashMap (); @@ -89,12 +63,11 @@ public final void loadFromMapTest() throws IOException { map.put("backoffTime", "4000"); map.put("numRetries", "3"); map.put("receiveQueueSize", 2000); - map.put("applicationName", "My test application"); map.put("initialPositionInStream", InitialPositionInStream.TRIM_HORIZON); map.put("startAtTime", DAY); SourceContext sourceContext = Mockito.mock(SourceContext.class); - KinesisSourceConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSourceConfig.class, sourceContext); + KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext); assertNotNull(config); assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws"); @@ -102,7 +75,7 @@ public final void loadFromMapTest() throws IOException { assertEquals(config.getAwsKinesisStreamName(), "my-stream"); assertEquals(config.getAwsCredentialPluginParam(), "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); - assertEquals(config.getApplicationName(), "My test application"); + assertEquals(config.getApplicationName(), "pulsar-kinesis"); assertEquals(config.getCheckpointInterval(), 30000); assertEquals(config.getBackoffTime(), 4000); assertEquals(config.getNumRetries(), 3); @@ -133,7 +106,7 @@ public final void loadFromMapCredentialFromSecretTest() throws IOException { SourceContext sourceContext = Mockito.mock(SourceContext.class); Mockito.when(sourceContext.getSecret("awsCredentialPluginParam")) .thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); - KinesisSourceConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSourceConfig.class, sourceContext); + KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext); assertNotNull(config); assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws"); @@ -156,19 +129,17 @@ public final void loadFromMapCredentialFromSecretTest() throws IOException { } @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "empty aws-credential param") + expectedExceptionsMessageRegExp = "awsCredentialPluginParam cannot be null") public final void missingCredentialsTest() throws Exception { Map map = new HashMap (); map.put("awsEndpoint", "https://some.endpoint.aws"); map.put("awsRegion", "us-east-1"); map.put("awsKinesisStreamName", "my-stream"); - - KinesisSource source = new KinesisSource(); - source.open(map, null); + KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class)); } @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Timestamp must be specified") + expectedExceptionsMessageRegExp = "When initialPositionInStream is AT_TIMESTAMP, startAtTime must be specified") public final void missingStartTimeTest() throws Exception { Map map = new HashMap (); map.put("awsEndpoint", "https://some.endpoint.aws"); @@ -177,13 +148,17 @@ public final void missingStartTimeTest() throws Exception { map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); map.put("initialPositionInStream", InitialPositionInStream.AT_TIMESTAMP); - - KinesisSource source = new KinesisSource(); - source.open(map, null); + KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class)); } - private File getFile(String name) { - ClassLoader classLoader = getClass().getClassLoader(); - return new File(classLoader.getResource(name).getFile()); + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Either the awsEndpoint, cloudwatchEndpoint, dynamoEndpoint or awsRegion must be set") + public final void missCloudWatchEndpointTest() { + Map map = new HashMap (); + map.put("awsEndpoint", "https://some.endpoint.aws"); + map.put("awsKinesisStreamName", "my-stream"); + map.put("awsCredentialPluginParam", + "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); + KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class)); } } diff --git a/pulsar-io/kinesis/src/test/resources/sinkConfig.yaml b/pulsar-io/kinesis/src/test/resources/sinkConfig.yaml deleted file mode 100644 index 7d99db65d079a..0000000000000 --- a/pulsar-io/kinesis/src/test/resources/sinkConfig.yaml +++ /dev/null @@ -1,27 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -{ - "awsEndpoint" : "https://some.endpoint.aws", - "awsRegion": "us-east-1", - "awsKinesisStreamName": "my-stream", - "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}", - "messageFormat": "ONLY_RAW_PAYLOAD", - "retainOrdering": "true" -} \ No newline at end of file diff --git a/pulsar-io/kinesis/src/test/resources/sourceConfig.yaml b/pulsar-io/kinesis/src/test/resources/sourceConfig.yaml deleted file mode 100644 index 64b564486c18c..0000000000000 --- a/pulsar-io/kinesis/src/test/resources/sourceConfig.yaml +++ /dev/null @@ -1,32 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -{ - "awsEndpoint" : "https://some.endpoint.aws", - "awsRegion": "us-east-1", - "awsKinesisStreamName": "my-stream", - "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}", - "applicationName": "My test application", - "checkpointInterval": "30000", - "backoffTime":"4000", - "numRetries":"3", - "receiveQueueSize": 2000, - "initialPositionInStream": "TRIM_HORIZON", - "startAtTime": "2019-03-05T19:28:58.000Z" -} \ No newline at end of file From 2e3bfa674449858b3cc0e44b371181df9f1a00a6 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 16 Aug 2023 21:28:16 +0800 Subject: [PATCH 2/2] Fix code reviews. --- .../apache/pulsar/io/kinesis/KinesisSinkConfig.java | 2 +- .../pulsar/io/kinesis/KinesisSourceConfig.java | 12 ++++++------ .../pulsar/io/kinesis/KinesisSinkConfigTests.java | 3 +-- .../pulsar/io/kinesis/KinesisSourceConfigTests.java | 3 +-- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java index 0905d211d4d23..f81fd32134be2 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java @@ -108,7 +108,7 @@ public static KinesisSinkConfig load(Map config, SinkContext sin KinesisSinkConfig kinesisSinkConfig = IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, sinkContext); checkArgument(isNotBlank(kinesisSinkConfig.getAwsRegion()) || (isNotBlank(kinesisSinkConfig.getAwsEndpoint()) && isNotBlank(kinesisSinkConfig.getCloudwatchEndpoint())), - "Either the awsEndpoint, cloudwatchEndpoint or awsRegion must be set"); + "Either \"awsRegion\" must be set OR all of [\"awsEndpoint\", \"cloudwatchEndpoint\"] must be set."); return kinesisSinkConfig; } diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java index a94658a035731..0dd9bfce9e0c2 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java @@ -133,12 +133,12 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab public static KinesisSourceConfig load(Map config, SourceContext sourceContext) { KinesisSourceConfig kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config, KinesisSourceConfig.class, sourceContext); - checkArgument(isNotBlank(kinesisSourceConfig.getAwsRegion()) - || (isNotBlank(kinesisSourceConfig.getAwsEndpoint()) - && isNotBlank(kinesisSourceConfig.getCloudwatchEndpoint()) - && isNotBlank(kinesisSourceConfig.getDynamoEndpoint())), - "Either the awsEndpoint, cloudwatchEndpoint, dynamoEndpoint or awsRegion must be set"); - checkArgument(isNotBlank(kinesisSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param"); + boolean isNotBlankEndpoint = isNotBlank(kinesisSourceConfig.getAwsEndpoint()) + && isNotBlank(kinesisSourceConfig.getCloudwatchEndpoint()) + && isNotBlank(kinesisSourceConfig.getDynamoEndpoint()); + checkArgument(isNotBlank(kinesisSourceConfig.getAwsRegion()) || isNotBlankEndpoint, + "Either \"awsRegion\" must be set OR all of " + + "[ \"awsEndpoint\", \"cloudwatchEndpoint\", and \"dynamoEndpoint\" ] must be set."); if (kinesisSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) { checkArgument((kinesisSourceConfig.getStartAtTime() != null), "When initialPositionInStream is AT_TIMESTAMP, startAtTime must be specified"); diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java index d23c26a3a95fa..a5051927ace54 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java @@ -70,8 +70,7 @@ public final void loadFromMapCredentialFromSecretTest() throws IOException { "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); } - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Either the awsEndpoint, cloudwatchEndpoint or awsRegion must be set") + @Test(expectedExceptions = IllegalArgumentException.class) public final void missCloudWatchEndpointTest() { Map map = new HashMap (); map.put("awsEndpoint", "https://some.endpoint.aws"); diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java index 23dbf71908c0c..4ba3593b1d9b8 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java @@ -151,8 +151,7 @@ public final void missingStartTimeTest() throws Exception { KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class)); } - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Either the awsEndpoint, cloudwatchEndpoint, dynamoEndpoint or awsRegion must be set") + @Test(expectedExceptions = IllegalArgumentException.class) public final void missCloudWatchEndpointTest() { Map map = new HashMap (); map.put("awsEndpoint", "https://some.endpoint.aws");