From cb883c32587c9975381225b6ca7dc600714b84e2 Mon Sep 17 00:00:00 2001 From: Yifan Ye Date: Tue, 1 Apr 2025 20:58:37 -0400 Subject: [PATCH 01/10] fix-ReadFromKafkaViaSDF-bug-shall-set-coder --- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 6ac206e27e85..835d6cfd14a5 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1786,6 +1786,7 @@ public PCollection> expand(PBegin input) { .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn()) .withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider()) .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) + .withValueDeserializerProviderAndCoder(kafkaRead.getValueDeserializerProvider(), KafkaRead.getValueCoder()) .withManualWatermarkEstimator() .withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory()) .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()) @@ -2373,6 +2374,12 @@ public ReadSourceDescriptors withValueDeserializerProvider( return toBuilder().setValueDeserializerProvider(deserializerProvider).build(); } + public ReadSourceDescriptors withValueDeserializerProviderAndCoder( + @Nullable DeserializerProvider deserializerProvider, Coder valueCoder) { + return toBuilder().setValueDeserializerProvider(deserializerProvider).setValueCoder(valueCoder).build(); + } + + /** * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka. * From acd269d14121da800bbab9f8c674c0a930bd7563 Mon Sep 17 00:00:00 2001 From: Yifan Ye Date: Tue, 1 Apr 2025 22:14:54 -0400 Subject: [PATCH 02/10] typo --- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 835d6cfd14a5..bc2ed7939690 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1786,7 +1786,7 @@ public PCollection> expand(PBegin input) { .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn()) .withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider()) .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) - .withValueDeserializerProviderAndCoder(kafkaRead.getValueDeserializerProvider(), KafkaRead.getValueCoder()) + .withValueDeserializerProviderAndCoder(kafkaRead.getValueDeserializerProvider(),kafkaRead.getValueCoder() ) .withManualWatermarkEstimator() .withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory()) .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()) From 5babd9877d8e565c744e2d3daa931122dee970ab Mon Sep 17 00:00:00 2001 From: Yifan Ye Date: Tue, 1 Apr 2025 22:24:38 -0400 Subject: [PATCH 03/10] shall be nullable --- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index bc2ed7939690..c701b70afd2c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1786,7 +1786,7 @@ public PCollection> expand(PBegin input) { .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn()) .withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider()) .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) - .withValueDeserializerProviderAndCoder(kafkaRead.getValueDeserializerProvider(),kafkaRead.getValueCoder() ) + .withValueDeserializerProviderAndCoder(kafkaRead.getValueDeserializerProvider(),kafkaRead.getValueCoder()) .withManualWatermarkEstimator() .withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory()) .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()) @@ -2375,7 +2375,7 @@ public ReadSourceDescriptors withValueDeserializerProvider( } public ReadSourceDescriptors withValueDeserializerProviderAndCoder( - @Nullable DeserializerProvider deserializerProvider, Coder valueCoder) { + @Nullable DeserializerProvider deserializerProvider, @Nullable Coder valueCoder) { return toBuilder().setValueDeserializerProvider(deserializerProvider).setValueCoder(valueCoder).build(); } From 2618afaae1e87eb7b249cee01bb8880b3a190476 Mon Sep 17 00:00:00 2001 From: Yifan Ye Date: Tue, 1 Apr 2025 22:34:58 -0400 Subject: [PATCH 04/10] bug fix --- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index c701b70afd2c..79aa45d5e699 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1786,7 +1786,7 @@ public PCollection> expand(PBegin input) { .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn()) .withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider()) .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) - .withValueDeserializerProviderAndCoder(kafkaRead.getValueDeserializerProvider(),kafkaRead.getValueCoder()) + .withValueDeserializerProviderAndCoder(kafkaRead.getValueDeserializerProvider(), valueCoder) .withManualWatermarkEstimator() .withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory()) .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()) @@ -2375,7 +2375,7 @@ public ReadSourceDescriptors withValueDeserializerProvider( } public ReadSourceDescriptors withValueDeserializerProviderAndCoder( - @Nullable DeserializerProvider deserializerProvider, @Nullable Coder valueCoder) { + @Nullable DeserializerProvider deserializerProvider, Coder valueCoder) { return toBuilder().setValueDeserializerProvider(deserializerProvider).setValueCoder(valueCoder).build(); } From b63f02dbb5ab5a248d635e43ba64fddbd65b35fc Mon Sep 17 00:00:00 2001 From: Yifan Ye Date: Tue, 1 Apr 2025 22:42:46 -0400 Subject: [PATCH 05/10] add key coder --- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 79aa45d5e699..60fd99655144 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1785,6 +1785,7 @@ public PCollection> expand(PBegin input) { .withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig()) .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn()) .withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider()) + .withKeyDeserializerProviderAndCoder(kafkaRead.getKeyDeserializerProvider(), keyCoder) .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) .withValueDeserializerProviderAndCoder(kafkaRead.getValueDeserializerProvider(), valueCoder) .withManualWatermarkEstimator() @@ -2379,7 +2380,11 @@ public ReadSourceDescriptors withValueDeserializerProviderAndCoder( return toBuilder().setValueDeserializerProvider(deserializerProvider).setValueCoder(valueCoder).build(); } - + public ReadSourceDescriptors withKeyDeserializerProviderAndCoder( + @Nullable DeserializerProvider deserializerProvider, Coder keyCoder) { + return toBuilder().setKeyDeserializerProvider(deserializerProvider).setKeyCoder(keyCoder).build(); + } + /** * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka. * From d5e2dad1e75de3b38441aff74a07f1df65d1b997 Mon Sep 17 00:00:00 2001 From: Yifan Ye Date: Tue, 1 Apr 2025 22:49:49 -0400 Subject: [PATCH 06/10] format --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 60fd99655144..aa58aaf3495e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1785,9 +1785,11 @@ public PCollection> expand(PBegin input) { .withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig()) .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn()) .withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider()) - .withKeyDeserializerProviderAndCoder(kafkaRead.getKeyDeserializerProvider(), keyCoder) + .withKeyDeserializerProviderAndCoder( + kafkaRead.getKeyDeserializerProvider(), keyCoder) .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) - .withValueDeserializerProviderAndCoder(kafkaRead.getValueDeserializerProvider(), valueCoder) + .withValueDeserializerProviderAndCoder( + kafkaRead.getValueDeserializerProvider(), valueCoder) .withManualWatermarkEstimator() .withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory()) .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()) @@ -2377,12 +2379,18 @@ public ReadSourceDescriptors withValueDeserializerProvider( public ReadSourceDescriptors withValueDeserializerProviderAndCoder( @Nullable DeserializerProvider deserializerProvider, Coder valueCoder) { - return toBuilder().setValueDeserializerProvider(deserializerProvider).setValueCoder(valueCoder).build(); + return toBuilder() + .setValueDeserializerProvider(deserializerProvider) + .setValueCoder(valueCoder) + .build(); } public ReadSourceDescriptors withKeyDeserializerProviderAndCoder( @Nullable DeserializerProvider deserializerProvider, Coder keyCoder) { - return toBuilder().setKeyDeserializerProvider(deserializerProvider).setKeyCoder(keyCoder).build(); + return toBuilder() + .setKeyDeserializerProvider(deserializerProvider) + .setKeyCoder(keyCoder) + .build(); } /** From 8619d645ac04c90a13820d4af153c38e8cd189b0 Mon Sep 17 00:00:00 2001 From: Yifan Ye Date: Wed, 2 Apr 2025 08:36:59 -0400 Subject: [PATCH 07/10] remove redudancy --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 76 +++++++++---------- 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index aa58aaf3495e..ed97345d8e5d 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1784,10 +1784,8 @@ public PCollection> expand(PBegin input) { .withConsumerConfigOverrides(kafkaRead.getConsumerConfig()) .withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig()) .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn()) - .withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider()) .withKeyDeserializerProviderAndCoder( kafkaRead.getKeyDeserializerProvider(), keyCoder) - .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) .withValueDeserializerProviderAndCoder( kafkaRead.getValueDeserializerProvider(), valueCoder) .withManualWatermarkEstimator() @@ -2367,22 +2365,34 @@ public ReadSourceDescriptors withBootstrapServers(String bootstrapServers) ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); } - public ReadSourceDescriptors withKeyDeserializerProvider( - @Nullable DeserializerProvider deserializerProvider) { - return toBuilder().setKeyDeserializerProvider(deserializerProvider).build(); + /** + * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka. + * + *

In addition, Beam also needs a {@link Coder} to serialize and deserialize key objects at + * runtime. KafkaIO tries to infer a coder for the key based on the {@link Deserializer} class, + * however in case that fails, you can use {@link #withKeyDeserializerAndCoder(Class, Coder)} to + * provide the key coder explicitly. + */ + public ReadSourceDescriptors withKeyDeserializer( + Class> keyDeserializer) { + return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer)); } - public ReadSourceDescriptors withValueDeserializerProvider( - @Nullable DeserializerProvider deserializerProvider) { - return toBuilder().setValueDeserializerProvider(deserializerProvider).build(); + /** + * Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a + * {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary. + * + *

Use this method to override the coder inference performed within {@link + * #withKeyDeserializer(Class)}. + */ + public ReadSourceDescriptors withKeyDeserializerAndCoder( + Class> keyDeserializer, Coder keyCoder) { + return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build(); } - public ReadSourceDescriptors withValueDeserializerProviderAndCoder( - @Nullable DeserializerProvider deserializerProvider, Coder valueCoder) { - return toBuilder() - .setValueDeserializerProvider(deserializerProvider) - .setValueCoder(valueCoder) - .build(); + public ReadSourceDescriptors withKeyDeserializerProvider( + @Nullable DeserializerProvider deserializerProvider) { + return toBuilder().setKeyDeserializerProvider(deserializerProvider).build(); } public ReadSourceDescriptors withKeyDeserializerProviderAndCoder( @@ -2393,19 +2403,6 @@ public ReadSourceDescriptors withKeyDeserializerProviderAndCoder( .build(); } - /** - * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka. - * - *

In addition, Beam also needs a {@link Coder} to serialize and deserialize key objects at - * runtime. KafkaIO tries to infer a coder for the key based on the {@link Deserializer} class, - * however in case that fails, you can use {@link #withKeyDeserializerAndCoder(Class, Coder)} to - * provide the key coder explicitly. - */ - public ReadSourceDescriptors withKeyDeserializer( - Class> keyDeserializer) { - return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer)); - } - /** * Sets a Kafka {@link Deserializer} to interpret value bytes read from Kafka. * @@ -2419,18 +2416,6 @@ public ReadSourceDescriptors withValueDeserializer( return withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer)); } - /** - * Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a - * {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary. - * - *

Use this method to override the coder inference performed within {@link - * #withKeyDeserializer(Class)}. - */ - public ReadSourceDescriptors withKeyDeserializerAndCoder( - Class> keyDeserializer, Coder keyCoder) { - return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build(); - } - /** * Sets a Kafka {@link Deserializer} for interpreting value bytes read from Kafka along with a * {@link Coder} for helping the Beam runner materialize value objects at runtime if necessary. @@ -2443,6 +2428,19 @@ public ReadSourceDescriptors withValueDeserializerAndCoder( return withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build(); } + public ReadSourceDescriptors withValueDeserializerProvider( + @Nullable DeserializerProvider deserializerProvider) { + return toBuilder().setValueDeserializerProvider(deserializerProvider).build(); + } + + public ReadSourceDescriptors withValueDeserializerProviderAndCoder( + @Nullable DeserializerProvider deserializerProvider, Coder valueCoder) { + return toBuilder() + .setValueDeserializerProvider(deserializerProvider) + .setValueCoder(valueCoder) + .build(); + } + /** * A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for * supporting another version of Kafka consumer. Default is {@link KafkaConsumer}. From a17f3ff175f577b1b96ee9efd93152f376e7ad48 Mon Sep 17 00:00:00 2001 From: Yifan Ye Date: Wed, 2 Apr 2025 08:56:51 -0400 Subject: [PATCH 08/10] add change log in version 2.65.0 --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index f3874359b7b0..4e5f80d4cb50 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -81,6 +81,7 @@ * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Fixed read Beam rows from cross-lang transform (for example, ReadFromJdbc) involving negative 32-bit integers incorrectly decoded to large integers ([#34089](https://github.com/apache/beam/issues/34089)) +* Fixed SDF-based KafkaIO (ReadFromKafkaViaSDF) to support custom deserializer that extends Deserializer interface ([#34505](https://github.com/apache/beam/pull/34505)) ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). From bdcd0c9ae41150a04961a746ff0074f3ec63749b Mon Sep 17 00:00:00 2001 From: Yifan Ye Date: Wed, 2 Apr 2025 09:09:08 -0400 Subject: [PATCH 09/10] update change log --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 4e5f80d4cb50..35d5011a74b4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -81,7 +81,7 @@ * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Fixed read Beam rows from cross-lang transform (for example, ReadFromJdbc) involving negative 32-bit integers incorrectly decoded to large integers ([#34089](https://github.com/apache/beam/issues/34089)) -* Fixed SDF-based KafkaIO (ReadFromKafkaViaSDF) to support custom deserializer that extends Deserializer interface ([#34505](https://github.com/apache/beam/pull/34505)) +* (Java) Fixed SDF-based KafkaIO (ReadFromKafkaViaSDF) to properly handle custom deserializers that extend Deserializer interface([#34505](https://github.com/apache/beam/pull/34505)) ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). From a041492bd95840dcf8743efaae0a941b0c123d43 Mon Sep 17 00:00:00 2001 From: Yifan Ye Date: Fri, 11 Apr 2025 15:51:13 -0400 Subject: [PATCH 10/10] add integration test --- .../apache/beam/sdk/io/kafka/KafkaIOIT.java | 157 ++++++++++++++++++ 1 file changed, 157 insertions(+) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 7455d5270969..6d0a2352bea2 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assume.assumeFalse; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.time.Instant; import java.util.Collection; @@ -41,6 +42,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; @@ -100,8 +102,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.AppInfoParser; @@ -1117,4 +1121,157 @@ private static void setupKafkaContainer() { kafkaContainer.start(); options.setKafkaBootstrapServerAddresses(kafkaContainer.getBootstrapServers()); } + + @Test + public void testCustomRowDeserializerWithViaSDF() throws IOException { + // This test verifies that the SDF implementation of KafkaIO correctly handles + // custom deserializers with explicit coders. It uses a Row deserializer which + // requires an explicit coder to be provided since Beam cannot infer one automatically. + // The test ensures that both the deserializer and coder are properly passed to + // the ReadSourceDescriptors transform. + + // Create a simple Row schema for test + Schema testSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build(); + + RowCoder rowCoder = RowCoder.of(testSchema); + + // Set up sample data + String testId = UUID.randomUUID().toString(); + String topicName = options.getKafkaTopic() + "-row-deserializer-" + testId; + + // Create test data + Map testData = new HashMap<>(); + for (int i = 0; i < 5; i++) { + testData.put( + "key" + i, + Row.withSchema(testSchema) + .withFieldValue("field1", "value" + i) + .withFieldValue("field2", i) + .build()); + } + + // Write the test data to Kafka using a custom serializer + writePipeline + .apply("Create test data", Create.of(testData)) + .apply( + "Write to Kafka", + KafkaIO.write() + .withBootstrapServers(options.getKafkaBootstrapServerAddresses()) + .withTopic(topicName) + .withKeySerializer(StringSerializer.class) + .withValueSerializer(RowSerializer.class) + .withProducerConfigUpdates(ImmutableMap.of("test.schema", testSchema.toString()))); + + PipelineResult writeResult = writePipeline.run(); + writeResult.waitUntilFinish(); + + // Read the data using SDF-based KafkaIO with a custom deserializer + PCollection> resultSDF = + sdfReadPipeline.apply( + "Read from Kafka via SDF", + KafkaIO.read() + .withBootstrapServers(options.getKafkaBootstrapServerAddresses()) + .withTopic(topicName) + .withConsumerConfigUpdates( + ImmutableMap.of( + "auto.offset.reset", "earliest", "test.schema", testSchema.toString())) + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializerAndCoder(RowDeserializer.class, rowCoder) + .withoutMetadata()); + + // Compare with the original data + PAssert.that(resultSDF) + .containsInAnyOrder( + testData.entrySet().stream() + .map(entry -> KV.of(entry.getKey(), entry.getValue())) + .collect(Collectors.toList())); + + // Run and verify + PipelineResult resultSDFResult = sdfReadPipeline.run(); + PipelineResult.State resultSDFState = + resultSDFResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout())); + cancelIfTimeouted(resultSDFResult, resultSDFState); + assertNotEquals(PipelineResult.State.FAILED, resultSDFState); + + // Clean up + tearDownTopic(topicName); + } + + /** A custom serializer for Row objects. */ + public static class RowSerializer implements Serializer { + private Schema schema; + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public void configure(Map configs, boolean isKey) { + String schemaString = (String) configs.get("test.schema"); + if (schemaString != null) { + // Use a more direct method to parse schema from string + try { + this.schema = Schema.builder().addStringField("field1").addInt32Field("field2").build(); + } catch (Exception e) { + throw new RuntimeException("Error parsing schema", e); + } + } + } + + @Override + public byte[] serialize(String topic, Row data) { + if (data == null) { + return null; + } + // Simple JSON serialization for test purposes + try { + // Ensure we're using the schema + if (schema != null && !schema.equals(data.getSchema())) { + throw new RuntimeException("Schema mismatch: " + schema + " vs " + data.getSchema()); + } + return mapper.writeValueAsBytes(data.getValues()); + } catch (Exception e) { + throw new RuntimeException("Error serializing Row to JSON", e); + } + } + + @Override + public void close() {} + } + + /** A custom deserializer for Row objects. */ + public static class RowDeserializer implements Deserializer { + private Schema schema; + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public void configure(Map configs, boolean isKey) { + String schemaString = (String) configs.get("test.schema"); + if (schemaString != null) { + // Use a more direct method to parse schema from string + try { + this.schema = Schema.builder().addStringField("field1").addInt32Field("field2").build(); + } catch (Exception e) { + throw new RuntimeException("Error parsing schema", e); + } + } + } + + @Override + public Row deserialize(String topic, byte[] data) { + if (data == null || schema == null) { + return null; + } + // Simple JSON deserialization for test purposes + try { + Object[] values = mapper.readValue(data, Object[].class); + return Row.withSchema(schema) + .withFieldValue("field1", values[0].toString()) + .withFieldValue("field2", Integer.parseInt(values[1].toString())) + .build(); + } catch (Exception e) { + throw new RuntimeException("Error deserializing JSON to Row", e); + } + } + + @Override + public void close() {} + } }