From 72eab1ac7ac7df830ace94bcaa4156e48c263a0f Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 9 Jul 2025 20:02:22 +0800 Subject: [PATCH 1/2] [fix][io] Make record properties configurable for kinesis source --- .../pulsar/io/kinesis/KinesisRecord.java | 36 ++++-- .../io/kinesis/KinesisRecordProcessor.java | 9 +- .../io/kinesis/KinesisSourceConfig.java | 23 ++++ .../pulsar/io/kinesis/KinesisRecordTest.java | 110 ++++++++++++++++++ .../io/kinesis/KinesisSourceConfigTests.java | 54 ++++++++- 5 files changed, 220 insertions(+), 12 deletions(-) create mode 100644 pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisRecordTest.java diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecord.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecord.java index 754b1e070f492..187e5ae9985f1 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecord.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecord.java @@ -24,31 +24,49 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; import org.apache.pulsar.functions.api.Record; import software.amazon.awssdk.services.kinesis.model.EncryptionType; import software.amazon.kinesis.retrieval.KinesisClientRecord; public class KinesisRecord implements Record { - public static final String ARRIVAL_TIMESTAMP = ""; - public static final String ENCRYPTION_TYPE = ""; - public static final String PARTITION_KEY = ""; - public static final String SEQUENCE_NUMBER = ""; + public static final String ARRIVAL_TIMESTAMP = "kinesis.arrival.timestamp"; + public static final String ENCRYPTION_TYPE = "kinesis.encryption.type"; + public static final String PARTITION_KEY = "kinesis.partition.key"; + public static final String SEQUENCE_NUMBER = "kinesis.sequence.number"; + public static final String SHARD_ID = "kinesis.shard.id"; + public static final String MILLIS_BEHIND_LATEST = "kinesis.millis.behind.latest"; private static final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder(); private final Optional key; private final byte[] value; private final HashMap userProperties = new HashMap<>(); - public KinesisRecord(KinesisClientRecord record) { + public KinesisRecord(KinesisClientRecord record, String shardId, long millisBehindLatest, + Set propertiesToInclude) { this.key = Optional.of(record.partitionKey()); // encryption type can (annoyingly) be null, so we default to NONE EncryptionType encType = EncryptionType.NONE; if (record.encryptionType() != null) { encType = record.encryptionType(); } - setProperty(ARRIVAL_TIMESTAMP, record.approximateArrivalTimestamp().toString()); - setProperty(ENCRYPTION_TYPE, encType.toString()); - setProperty(PARTITION_KEY, record.partitionKey()); - setProperty(SEQUENCE_NUMBER, record.sequenceNumber()); + if (propertiesToInclude.contains(ARRIVAL_TIMESTAMP)) { + setProperty(ARRIVAL_TIMESTAMP, record.approximateArrivalTimestamp().toString()); + } + if (propertiesToInclude.contains(ENCRYPTION_TYPE)) { + setProperty(ENCRYPTION_TYPE, encType.toString()); + } + if (propertiesToInclude.contains(PARTITION_KEY)) { + setProperty(PARTITION_KEY, record.partitionKey()); + } + if (propertiesToInclude.contains(SEQUENCE_NUMBER)) { + setProperty(SEQUENCE_NUMBER, record.sequenceNumber()); + } + if (propertiesToInclude.contains(SHARD_ID)) { + setProperty(SHARD_ID, shardId); + } + if (propertiesToInclude.contains(MILLIS_BEHIND_LATEST)) { + setProperty(MILLIS_BEHIND_LATEST, String.valueOf(millisBehindLatest)); + } if (encType == EncryptionType.NONE) { String s = null; diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java index 60dd263eab459..b51ac544be24c 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.kinesis; +import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.exceptions.InvalidStateException; @@ -43,11 +44,13 @@ public class KinesisRecordProcessor implements ShardRecordProcessor { private final LinkedBlockingQueue queue; private long nextCheckpointTimeInNanos; private String kinesisShardId; + private final Set propertiesToInclude; public KinesisRecordProcessor(LinkedBlockingQueue queue, KinesisSourceConfig config) { this.queue = queue; this.checkpointInterval = config.getCheckpointInterval(); this.numRetries = config.getNumRetries(); this.backoffTime = config.getBackoffTime(); + this.propertiesToInclude = config.getPropertiesToInclude(); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { @@ -82,16 +85,20 @@ private void checkpoint(RecordProcessorCheckpointer checkpointer) { @Override public void initialize(InitializationInput initializationInput) { kinesisShardId = initializationInput.shardId(); + log.info("Initializing KinesisRecordProcessor for shard {}. Config: checkpointInterval={}ms, numRetries={}, " + + "backoffTime={}ms, propertiesToInclude={}", + kinesisShardId, checkpointInterval, numRetries, backoffTime, propertiesToInclude); } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { log.info("Processing " + processRecordsInput.records().size() + " records from " + kinesisShardId); + long millisBehindLatest = processRecordsInput.millisBehindLatest(); for (KinesisClientRecord record : processRecordsInput.records()) { try { - queue.put(new KinesisRecord(record)); + queue.put(new KinesisRecord(record, this.kinesisShardId, millisBehindLatest, propertiesToInclude)); } catch (InterruptedException e) { log.warn("unable to create KinesisRecord ", e); } diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java index 0dd9bfce9e0c2..83cd70a4d64ba 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java @@ -22,8 +22,12 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.io.Serializable; import java.net.URI; +import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import lombok.Data; import lombok.EqualsAndHashCode; import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin; @@ -130,6 +134,16 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab ) private boolean useEnhancedFanOut = true; + @FieldDoc(required = false, + defaultValue = "kinesis.arrival.timestamp,kinesis.encryption.type,kinesis.partition.key," + + "kinesis.sequence.number,kinesis.shard.id,kinesis.millis.behind.latest", + help = "A comma-separated list of Kinesis metadata properties to include in the Pulsar message properties." + + " The supported properties are: kinesis.arrival.timestamp, kinesis.encryption.type, " + + "kinesis.partition.key, kinesis.sequence.number, kinesis.shard.id, kinesis.millis.behind.latest") + private String kinesisRecordProperties = "kinesis.arrival.timestamp,kinesis.encryption.type," + + "kinesis.partition.key,kinesis.sequence.number,kinesis.shard.id,kinesis.millis.behind.latest"; + private transient Set propertiesToInclude; + public static KinesisSourceConfig load(Map config, SourceContext sourceContext) { KinesisSourceConfig kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config, KinesisSourceConfig.class, sourceContext); @@ -143,6 +157,15 @@ && isNotBlank(kinesisSourceConfig.getCloudwatchEndpoint()) checkArgument((kinesisSourceConfig.getStartAtTime() != null), "When initialPositionInStream is AT_TIMESTAMP, startAtTime must be specified"); } + if (isNotBlank(kinesisSourceConfig.getKinesisRecordProperties())) { + Set properties = Arrays.stream(kinesisSourceConfig.getKinesisRecordProperties().split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toSet()); + kinesisSourceConfig.setPropertiesToInclude(properties); + } else { + kinesisSourceConfig.setPropertiesToInclude(Collections.emptySet()); + } return kinesisSourceConfig; } diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisRecordTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisRecordTest.java new file mode 100644 index 0000000000000..bf32720a795f7 --- /dev/null +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisRecordTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.kinesis; + +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.mockito.Mockito; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import software.amazon.awssdk.services.kinesis.model.EncryptionType; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +public class KinesisRecordTest { + + private KinesisClientRecord mockRecord; + private final String shardId = "shard-001"; + private final long millisBehindLatest = 12345L; + private final String partitionKey = "test-key"; + private final String sequenceNumber = "seq-123"; + private final Instant arrivalTimestamp = Instant.now(); + + @BeforeMethod + public void setup() { + mockRecord = Mockito.mock(KinesisClientRecord.class); + when(mockRecord.partitionKey()).thenReturn(partitionKey); + when(mockRecord.sequenceNumber()).thenReturn(sequenceNumber); + when(mockRecord.approximateArrivalTimestamp()).thenReturn(arrivalTimestamp); + when(mockRecord.encryptionType()).thenReturn(EncryptionType.NONE); + when(mockRecord.data()).thenReturn(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))); + } + + @Test + public void testAllPropertiesIncluded() { + Set propertiesToInclude = new HashSet<>(Arrays.asList( + KinesisRecord.ARRIVAL_TIMESTAMP, + KinesisRecord.ENCRYPTION_TYPE, + KinesisRecord.PARTITION_KEY, + KinesisRecord.SEQUENCE_NUMBER, + KinesisRecord.SHARD_ID, + KinesisRecord.MILLIS_BEHIND_LATEST + )); + + KinesisRecord kinesisRecord = new KinesisRecord(mockRecord, shardId, millisBehindLatest, propertiesToInclude); + Map properties = kinesisRecord.getProperties(); + + assertEquals(properties.size(), 6); + assertEquals(properties.get(KinesisRecord.SHARD_ID), shardId); + assertEquals(properties.get(KinesisRecord.MILLIS_BEHIND_LATEST), String.valueOf(millisBehindLatest)); + assertEquals(properties.get(KinesisRecord.PARTITION_KEY), partitionKey); + assertEquals(properties.get(KinesisRecord.SEQUENCE_NUMBER), sequenceNumber); + assertEquals(properties.get(KinesisRecord.ARRIVAL_TIMESTAMP), arrivalTimestamp.toString()); + assertEquals(properties.get(KinesisRecord.ENCRYPTION_TYPE), EncryptionType.NONE.toString()); + } + + @Test + public void testSomePropertiesIncluded() { + Set propertiesToInclude = new HashSet<>(Arrays.asList( + KinesisRecord.SHARD_ID, + KinesisRecord.SEQUENCE_NUMBER + )); + + KinesisRecord kinesisRecord = new KinesisRecord(mockRecord, shardId, millisBehindLatest, propertiesToInclude); + Map properties = kinesisRecord.getProperties(); + + assertEquals(properties.size(), 2); + assertTrue(properties.containsKey(KinesisRecord.SHARD_ID)); + assertTrue(properties.containsKey(KinesisRecord.SEQUENCE_NUMBER)); + + assertFalse(properties.containsKey(KinesisRecord.PARTITION_KEY)); + assertFalse(properties.containsKey(KinesisRecord.ARRIVAL_TIMESTAMP)); + assertFalse(properties.containsKey(KinesisRecord.ENCRYPTION_TYPE)); + assertFalse(properties.containsKey(KinesisRecord.MILLIS_BEHIND_LATEST)); + } + + @Test + public void testNoPropertiesIncluded() { + Set propertiesToInclude = Collections.emptySet(); + + KinesisRecord kinesisRecord = new KinesisRecord(mockRecord, shardId, millisBehindLatest, propertiesToInclude); + Map properties = kinesisRecord.getProperties(); + + assertTrue(properties.isEmpty()); + } +} \ No newline at end of file diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java index 4ba3593b1d9b8..fb341245d3386 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java @@ -20,6 +20,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; import java.io.IOException; import java.time.ZoneOffset; @@ -28,7 +29,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; - +import java.util.Set; import org.apache.pulsar.io.core.SourceContext; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -160,4 +161,53 @@ public final void missCloudWatchEndpointTest() { "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class)); } -} + + @Test + public final void propertiesDefaultTest() { + Map map = new HashMap<>(); + map.put("awsRegion", "us-west-1"); + map.put("awsKinesisStreamName", "my-stream"); + map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); + + SourceContext sourceContext = Mockito.mock(SourceContext.class); + KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext); + + Set properties = config.getPropertiesToInclude(); + assertEquals(properties.size(), 6); + assertTrue(properties.contains("kinesis.shard.id")); + assertTrue(properties.contains("kinesis.sequence.number")); + } + + @Test + public final void propertiesCustomTest() { + Map map = new HashMap<>(); + map.put("awsRegion", "us-west-1"); + map.put("awsKinesisStreamName", "my-stream"); + map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); + // Set custom properties, note the extra whitespace to test trim() + map.put("kinesisRecordProperties", "kinesis.shard.id, kinesis.partition.key "); + + SourceContext sourceContext = Mockito.mock(SourceContext.class); + KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext); + + Set properties = config.getPropertiesToInclude(); + assertEquals(properties.size(), 2); + assertTrue(properties.contains("kinesis.shard.id")); + assertTrue(properties.contains("kinesis.partition.key")); + } + + @Test + public final void propertiesEmptyTest() { + Map map = new HashMap<>(); + map.put("awsRegion", "us-west-1"); + map.put("awsKinesisStreamName", "my-stream"); + map.put("awsCredentialPluginParam", "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); + map.put("kinesisRecordProperties", ""); + + SourceContext sourceContext = Mockito.mock(SourceContext.class); + KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext); + + Set properties = config.getPropertiesToInclude(); + assertTrue(properties.isEmpty()); + } +} \ No newline at end of file From 42fc07854c34ea305eb860f1ed3c1e824dd48f55 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 10 Jul 2025 10:31:01 +0800 Subject: [PATCH 2/2] default value retains all previously available properties to ensure backward compatibility --- .../org/apache/pulsar/io/kinesis/KinesisSourceConfig.java | 4 ++-- .../apache/pulsar/io/kinesis/KinesisSourceConfigTests.java | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java index 83cd70a4d64ba..ebcd06bce12f7 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java @@ -136,12 +136,12 @@ public class KinesisSourceConfig extends BaseKinesisConfig implements Serializab @FieldDoc(required = false, defaultValue = "kinesis.arrival.timestamp,kinesis.encryption.type,kinesis.partition.key," - + "kinesis.sequence.number,kinesis.shard.id,kinesis.millis.behind.latest", + + "kinesis.sequence.number", help = "A comma-separated list of Kinesis metadata properties to include in the Pulsar message properties." + " The supported properties are: kinesis.arrival.timestamp, kinesis.encryption.type, " + "kinesis.partition.key, kinesis.sequence.number, kinesis.shard.id, kinesis.millis.behind.latest") private String kinesisRecordProperties = "kinesis.arrival.timestamp,kinesis.encryption.type," - + "kinesis.partition.key,kinesis.sequence.number,kinesis.shard.id,kinesis.millis.behind.latest"; + + "kinesis.partition.key,kinesis.sequence.number"; private transient Set propertiesToInclude; public static KinesisSourceConfig load(Map config, SourceContext sourceContext) { diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java index fb341245d3386..4f64722efde43 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java @@ -173,9 +173,11 @@ public final void propertiesDefaultTest() { KinesisSourceConfig config = KinesisSourceConfig.load(map, sourceContext); Set properties = config.getPropertiesToInclude(); - assertEquals(properties.size(), 6); - assertTrue(properties.contains("kinesis.shard.id")); + assertEquals(properties.size(), 4); assertTrue(properties.contains("kinesis.sequence.number")); + assertTrue(properties.contains("kinesis.arrival.timestamp")); + assertTrue(properties.contains("kinesis.encryption.type")); + assertTrue(properties.contains("kinesis.partition.key")); } @Test