Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public abstract class BaseKinesisConfig implements Serializable {
)
private String awsEndpoint = "";

@FieldDoc(
required = false,
defaultValue = "",
help = "Cloudwatch end-point url. It can be found at "
+ "https://docs.aws.amazon.com/general/latest/gr/rande.html"
)
private String cloudwatchEndpoint = "";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

·cloudWatchEndpoint` ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@FieldDoc(
required = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.io.kinesis;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
Expand Down Expand Up @@ -49,7 +48,6 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
Expand Down Expand Up @@ -155,17 +153,16 @@ public void close() {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
kinesisSinkConfig = IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, sinkContext);
kinesisSinkConfig = KinesisSinkConfig.load(config, sinkContext);
this.sinkContext = sinkContext;

checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint())
|| isNotBlank(kinesisSinkConfig.getAwsRegion()),
"Either the aws-end-point or aws-region must be set");
checkArgument(isNotBlank(kinesisSinkConfig.getAwsCredentialPluginParam()), "empty aws-credential param");

KinesisProducerConfiguration kinesisConfig = new KinesisProducerConfiguration();
kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint());
if (isNotBlank(kinesisSinkConfig.getAwsEndpoint())) {
kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint());
}
if (isNotBlank(kinesisSinkConfig.getCloudwatchEndpoint())) {
kinesisConfig.setCloudwatchEndpoint(kinesisSinkConfig.getCloudwatchEndpoint());
}
if (kinesisSinkConfig.getAwsEndpointPort() != null) {
kinesisConfig.setKinesisPort(kinesisSinkConfig.getAwsEndpointPort());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
*/
package org.apache.pulsar.io.kinesis;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
import java.io.IOException;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;

@Data
Expand Down Expand Up @@ -103,9 +104,12 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
help = "The maximum delay(in milliseconds) between retries.")
private long retryMaxDelayInMillis = 60000;

public static KinesisSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), KinesisSinkConfig.class);
public static KinesisSinkConfig load(Map<String, Object> config, SinkContext sinkContext) {
KinesisSinkConfig kinesisSinkConfig = IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, sinkContext);
checkArgument(isNotBlank(kinesisSinkConfig.getAwsRegion())
|| (isNotBlank(kinesisSinkConfig.getAwsEndpoint()) && isNotBlank(kinesisSinkConfig.getCloudwatchEndpoint())),
"Either \"awsRegion\" must be set OR all of [\"awsEndpoint\", \"cloudwatchEndpoint\"] must be set.");
return kinesisSinkConfig;
}

public enum MessageFormat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@
*/
package org.apache.pulsar.io.kinesis;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.net.InetAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig;
Expand Down Expand Up @@ -68,18 +64,7 @@ public void close() throws Exception {

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config, KinesisSourceConfig.class, sourceContext);

checkArgument(isNotBlank(kinesisSourceConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
checkArgument(isNotBlank(kinesisSourceConfig.getAwsEndpoint())
|| isNotBlank(kinesisSourceConfig.getAwsRegion()),
"Either the aws-end-point or aws-region must be set");
checkArgument(isNotBlank(kinesisSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param");

if (kinesisSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
checkArgument((kinesisSourceConfig.getStartAtTime() != null), "Timestamp must be specified");
}

this.kinesisSourceConfig = KinesisSourceConfig.load(config, sourceContext);
queue = new LinkedBlockingQueue<>(kinesisSourceConfig.getReceiveQueueSize());
workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
*/
package org.apache.pulsar.io.kinesis;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
import java.io.IOException;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.Serializable;
import java.net.URI;
import java.util.Date;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
Expand Down Expand Up @@ -76,7 +77,7 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab

@FieldDoc(
required = false,
defaultValue = "Apache Pulsar IO Connector",
defaultValue = "pulsar-kinesis",
help = "Name of the Amazon Kinesis application. By default the application name is included "
+ "in the user agent string used to make AWS requests. This can assist with troubleshooting "
+ "(e.g. distinguish requests made by separate connectors instances)."
Expand Down Expand Up @@ -122,24 +123,27 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab
)
private String dynamoEndpoint = "";

@FieldDoc(
required = false,
defaultValue = "",
help = "Cloudwatch end-point url. It can be found at https://docs.aws.amazon.com/general/latest/gr/rande.html"
)
private String cloudwatchEndpoint = "";

@FieldDoc(
required = false,
defaultValue = "true",
help = "When true, uses Kinesis enhanced fan-out, when false, uses polling"
)
private boolean useEnhancedFanOut = true;


public static KinesisSourceConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), KinesisSourceConfig.class);
public static KinesisSourceConfig load(Map<String, Object> config, SourceContext sourceContext) {
KinesisSourceConfig kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config,
KinesisSourceConfig.class, sourceContext);
boolean isNotBlankEndpoint = isNotBlank(kinesisSourceConfig.getAwsEndpoint())
&& isNotBlank(kinesisSourceConfig.getCloudwatchEndpoint())
&& isNotBlank(kinesisSourceConfig.getDynamoEndpoint());
checkArgument(isNotBlank(kinesisSourceConfig.getAwsRegion()) || isNotBlankEndpoint,
"Either \"awsRegion\" must be set OR all of "
+ "[ \"awsEndpoint\", \"cloudwatchEndpoint\", and \"dynamoEndpoint\" ] must be set.");
if (kinesisSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
checkArgument((kinesisSourceConfig.getStartAtTime() != null),
"When initialPositionInStream is AT_TIMESTAMP, startAtTime must be specified");
}
return kinesisSourceConfig;
}

public KinesisAsyncClient buildKinesisAsyncClient(AwsCredentialProviderPlugin credPlugin) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,16 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
import org.mockito.Mockito;
import org.testng.annotations.Test;

public class KinesisSinkConfigTests {

@Test
public final void loadFromYamlFileTest() throws IOException {
File yamlFile = getFile("sinkConfig.yaml");
KinesisSinkConfig config = KinesisSinkConfig.load(yamlFile.getAbsolutePath());

assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
assertEquals(config.getAwsRegion(), "us-east-1");
assertEquals(config.getAwsKinesisStreamName(), "my-stream");
assertEquals(config.getAwsCredentialPluginParam(),
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
assertEquals(config.getMessageFormat(), MessageFormat.ONLY_RAW_PAYLOAD);
assertEquals(true, config.isRetainOrdering());
}

@Test
public final void loadFromMapTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
Expand All @@ -58,7 +40,7 @@ public final void loadFromMapTest() throws IOException {
map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");

SinkContext sinkContext = Mockito.mock(SinkContext.class);
KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSinkConfig.class, sinkContext);
KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext);

assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
Expand All @@ -78,7 +60,7 @@ public final void loadFromMapCredentialFromSecretTest() throws IOException {
SinkContext sinkContext = Mockito.mock(SinkContext.class);
Mockito.when(sinkContext.getSecret("awsCredentialPluginParam"))
.thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map, KinesisSinkConfig.class, sinkContext);
KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext);

assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
Expand All @@ -88,8 +70,13 @@ public final void loadFromMapCredentialFromSecretTest() throws IOException {
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
}

private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());
@Test(expectedExceptions = IllegalArgumentException.class)
public final void missCloudWatchEndpointTest() {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("awsEndpoint", "https://some.endpoint.aws");
map.put("awsKinesisStreamName", "my-stream");
map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
SinkContext sinkContext = Mockito.mock(SinkContext.class);
KinesisSinkConfig.load(map, sinkContext);
}
}
Loading