Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,49 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.functions.api.Record;
import software.amazon.awssdk.services.kinesis.model.EncryptionType;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public class KinesisRecord implements Record<byte[]> {
public static final String ARRIVAL_TIMESTAMP = "";
public static final String ENCRYPTION_TYPE = "";
public static final String PARTITION_KEY = "";
public static final String SEQUENCE_NUMBER = "";
public static final String ARRIVAL_TIMESTAMP = "kinesis.arrival.timestamp";
public static final String ENCRYPTION_TYPE = "kinesis.encryption.type";
public static final String PARTITION_KEY = "kinesis.partition.key";
public static final String SEQUENCE_NUMBER = "kinesis.sequence.number";
public static final String SHARD_ID = "kinesis.shard.id";
public static final String MILLIS_BEHIND_LATEST = "kinesis.millis.behind.latest";

private static final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
private final Optional<String> key;
private final byte[] value;
private final HashMap<String, String> userProperties = new HashMap<>();
public KinesisRecord(KinesisClientRecord record) {
public KinesisRecord(KinesisClientRecord record, String shardId, long millisBehindLatest,
Set<String> propertiesToInclude) {
this.key = Optional.of(record.partitionKey());
// encryption type can (annoyingly) be null, so we default to NONE
EncryptionType encType = EncryptionType.NONE;
if (record.encryptionType() != null) {
encType = record.encryptionType();
}
setProperty(ARRIVAL_TIMESTAMP, record.approximateArrivalTimestamp().toString());
setProperty(ENCRYPTION_TYPE, encType.toString());
setProperty(PARTITION_KEY, record.partitionKey());
setProperty(SEQUENCE_NUMBER, record.sequenceNumber());
if (propertiesToInclude.contains(ARRIVAL_TIMESTAMP)) {
setProperty(ARRIVAL_TIMESTAMP, record.approximateArrivalTimestamp().toString());
}
if (propertiesToInclude.contains(ENCRYPTION_TYPE)) {
setProperty(ENCRYPTION_TYPE, encType.toString());
}
if (propertiesToInclude.contains(PARTITION_KEY)) {
setProperty(PARTITION_KEY, record.partitionKey());
}
if (propertiesToInclude.contains(SEQUENCE_NUMBER)) {
setProperty(SEQUENCE_NUMBER, record.sequenceNumber());
}
if (propertiesToInclude.contains(SHARD_ID)) {
setProperty(SHARD_ID, shardId);
}
if (propertiesToInclude.contains(MILLIS_BEHIND_LATEST)) {
setProperty(MILLIS_BEHIND_LATEST, String.valueOf(millisBehindLatest));
}

if (encType == EncryptionType.NONE) {
String s = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.kinesis;

import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.exceptions.InvalidStateException;
Expand All @@ -43,11 +44,13 @@ public class KinesisRecordProcessor implements ShardRecordProcessor {
private final LinkedBlockingQueue<KinesisRecord> queue;
private long nextCheckpointTimeInNanos;
private String kinesisShardId;
private final Set<String> propertiesToInclude;
public KinesisRecordProcessor(LinkedBlockingQueue<KinesisRecord> queue, KinesisSourceConfig config) {
this.queue = queue;
this.checkpointInterval = config.getCheckpointInterval();
this.numRetries = config.getNumRetries();
this.backoffTime = config.getBackoffTime();
this.propertiesToInclude = config.getPropertiesToInclude();
}

private void checkpoint(RecordProcessorCheckpointer checkpointer) {
Expand Down Expand Up @@ -82,16 +85,20 @@ private void checkpoint(RecordProcessorCheckpointer checkpointer) {
@Override
public void initialize(InitializationInput initializationInput) {
kinesisShardId = initializationInput.shardId();
log.info("Initializing KinesisRecordProcessor for shard {}. Config: checkpointInterval={}ms, numRetries={}, "
+ "backoffTime={}ms, propertiesToInclude={}",
kinesisShardId, checkpointInterval, numRetries, backoffTime, propertiesToInclude);
}

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {

log.info("Processing " + processRecordsInput.records().size() + " records from " + kinesisShardId);
Comment thread
shibd marked this conversation as resolved.
long millisBehindLatest = processRecordsInput.millisBehindLatest();

for (KinesisClientRecord record : processRecordsInput.records()) {
try {
queue.put(new KinesisRecord(record));
queue.put(new KinesisRecord(record, this.kinesisShardId, millisBehindLatest, propertiesToInclude));
} catch (InterruptedException e) {
log.warn("unable to create KinesisRecord ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.Serializable;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
Expand Down Expand Up @@ -130,6 +134,16 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab
)
private boolean useEnhancedFanOut = true;

@FieldDoc(required = false,
defaultValue = "kinesis.arrival.timestamp,kinesis.encryption.type,kinesis.partition.key,"
+ "kinesis.sequence.number",
help = "A comma-separated list of Kinesis metadata properties to include in the Pulsar message properties."
+ " The supported properties are: kinesis.arrival.timestamp, kinesis.encryption.type, "
+ "kinesis.partition.key, kinesis.sequence.number, kinesis.shard.id, kinesis.millis.behind.latest")
private String kinesisRecordProperties = "kinesis.arrival.timestamp,kinesis.encryption.type,"
+ "kinesis.partition.key,kinesis.sequence.number";
private transient Set<String> propertiesToInclude;

public static KinesisSourceConfig load(Map<String, Object> config, SourceContext sourceContext) {
KinesisSourceConfig kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config,
KinesisSourceConfig.class, sourceContext);
Expand All @@ -143,6 +157,15 @@ && isNotBlank(kinesisSourceConfig.getCloudwatchEndpoint())
checkArgument((kinesisSourceConfig.getStartAtTime() != null),
"When initialPositionInStream is AT_TIMESTAMP, startAtTime must be specified");
}
if (isNotBlank(kinesisSourceConfig.getKinesisRecordProperties())) {
Set<String> properties = Arrays.stream(kinesisSourceConfig.getKinesisRecordProperties().split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toSet());
kinesisSourceConfig.setPropertiesToInclude(properties);
} else {
kinesisSourceConfig.setPropertiesToInclude(Collections.emptySet());
}
return kinesisSourceConfig;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.kinesis;

import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import software.amazon.awssdk.services.kinesis.model.EncryptionType;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public class KinesisRecordTest {

private KinesisClientRecord mockRecord;
private final String shardId = "shard-001";
private final long millisBehindLatest = 12345L;
private final String partitionKey = "test-key";
private final String sequenceNumber = "seq-123";
private final Instant arrivalTimestamp = Instant.now();

@BeforeMethod
public void setup() {
mockRecord = Mockito.mock(KinesisClientRecord.class);
when(mockRecord.partitionKey()).thenReturn(partitionKey);
when(mockRecord.sequenceNumber()).thenReturn(sequenceNumber);
when(mockRecord.approximateArrivalTimestamp()).thenReturn(arrivalTimestamp);
when(mockRecord.encryptionType()).thenReturn(EncryptionType.NONE);
when(mockRecord.data()).thenReturn(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8)));
}

@Test
public void testAllPropertiesIncluded() {
Set<String> propertiesToInclude = new HashSet<>(Arrays.asList(
KinesisRecord.ARRIVAL_TIMESTAMP,
KinesisRecord.ENCRYPTION_TYPE,
KinesisRecord.PARTITION_KEY,
KinesisRecord.SEQUENCE_NUMBER,
KinesisRecord.SHARD_ID,
KinesisRecord.MILLIS_BEHIND_LATEST
));

KinesisRecord kinesisRecord = new KinesisRecord(mockRecord, shardId, millisBehindLatest, propertiesToInclude);
Map<String, String> properties = kinesisRecord.getProperties();

assertEquals(properties.size(), 6);
assertEquals(properties.get(KinesisRecord.SHARD_ID), shardId);
assertEquals(properties.get(KinesisRecord.MILLIS_BEHIND_LATEST), String.valueOf(millisBehindLatest));
assertEquals(properties.get(KinesisRecord.PARTITION_KEY), partitionKey);
assertEquals(properties.get(KinesisRecord.SEQUENCE_NUMBER), sequenceNumber);
assertEquals(properties.get(KinesisRecord.ARRIVAL_TIMESTAMP), arrivalTimestamp.toString());
assertEquals(properties.get(KinesisRecord.ENCRYPTION_TYPE), EncryptionType.NONE.toString());
}

@Test
public void testSomePropertiesIncluded() {
Set<String> propertiesToInclude = new HashSet<>(Arrays.asList(
KinesisRecord.SHARD_ID,
KinesisRecord.SEQUENCE_NUMBER
));

KinesisRecord kinesisRecord = new KinesisRecord(mockRecord, shardId, millisBehindLatest, propertiesToInclude);
Map<String, String> properties = kinesisRecord.getProperties();

assertEquals(properties.size(), 2);
assertTrue(properties.containsKey(KinesisRecord.SHARD_ID));
assertTrue(properties.containsKey(KinesisRecord.SEQUENCE_NUMBER));

assertFalse(properties.containsKey(KinesisRecord.PARTITION_KEY));
assertFalse(properties.containsKey(KinesisRecord.ARRIVAL_TIMESTAMP));
assertFalse(properties.containsKey(KinesisRecord.ENCRYPTION_TYPE));
assertFalse(properties.containsKey(KinesisRecord.MILLIS_BEHIND_LATEST));
}

@Test
public void testNoPropertiesIncluded() {
Set<String> propertiesToInclude = Collections.emptySet();

KinesisRecord kinesisRecord = new KinesisRecord(mockRecord, shardId, millisBehindLatest, propertiesToInclude);
Map<String, String> properties = kinesisRecord.getProperties();

assertTrue(properties.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import java.io.IOException;
import java.time.ZoneOffset;
Expand All @@ -28,7 +29,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import java.util.Set;
import org.apache.pulsar.io.core.SourceContext;
import org.mockito.Mockito;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -160,4 +161,55 @@ public final void missCloudWatchEndpointTest() {
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class));
}
}

@Test
public final void propertiesDefaultTest() {
Map<String, Object> map = new HashMap<>();
map.put("awsRegion", "us-west-1");
map.put("awsKinesisStreamName", "my-stream");
map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");

SourceContext sourceContext = Mockito.mock(SourceContext.class);
KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext);

Set<String> properties = config.getPropertiesToInclude();
assertEquals(properties.size(), 4);
assertTrue(properties.contains("kinesis.sequence.number"));
Comment thread
shibd marked this conversation as resolved.
assertTrue(properties.contains("kinesis.arrival.timestamp"));
assertTrue(properties.contains("kinesis.encryption.type"));
assertTrue(properties.contains("kinesis.partition.key"));
}

@Test
public final void propertiesCustomTest() {
Map<String, Object> map = new HashMap<>();
map.put("awsRegion", "us-west-1");
map.put("awsKinesisStreamName", "my-stream");
map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
// Set custom properties, note the extra whitespace to test trim()
map.put("kinesisRecordProperties", "kinesis.shard.id, kinesis.partition.key ");

SourceContext sourceContext = Mockito.mock(SourceContext.class);
KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext);

Set<String> properties = config.getPropertiesToInclude();
assertEquals(properties.size(), 2);
assertTrue(properties.contains("kinesis.shard.id"));
assertTrue(properties.contains("kinesis.partition.key"));
}

@Test
public final void propertiesEmptyTest() {
Map<String, Object> map = new HashMap<>();
map.put("awsRegion", "us-west-1");
map.put("awsKinesisStreamName", "my-stream");
map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
map.put("kinesisRecordProperties", "");

SourceContext sourceContext = Mockito.mock(SourceContext.class);
KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext);

Set<String> properties = config.getPropertiesToInclude();
assertTrue(properties.isEmpty());
}
}
Loading