diff --git a/CHANGES.md b/CHANGES.md index 8097d48d2935..186dc17a46b5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -88,6 +88,7 @@ * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Fixed read Beam rows from cross-lang transform (for example, ReadFromJdbc) involving negative 32-bit integers incorrectly decoded to large integers ([#34089](https://github.com/apache/beam/issues/34089)) +* (Java) Fixed SDF-based KafkaIO (ReadFromKafkaViaSDF) to properly handle custom deserializers that extend Deserializer interface([#34505](https://github.com/apache/beam/pull/34505)) ## Security Fixes * Fixed [CVE-YYYY-NNNN](https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN) (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 6ac206e27e85..ed97345d8e5d 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1784,8 +1784,10 @@ public PCollection> expand(PBegin input) { .withConsumerConfigOverrides(kafkaRead.getConsumerConfig()) .withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig()) .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn()) - .withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider()) - .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) + .withKeyDeserializerProviderAndCoder( + kafkaRead.getKeyDeserializerProvider(), keyCoder) + .withValueDeserializerProviderAndCoder( + kafkaRead.getValueDeserializerProvider(), valueCoder) .withManualWatermarkEstimator() .withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory()) .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()) @@ -2363,16 +2365,6 @@ public ReadSourceDescriptors withBootstrapServers(String bootstrapServers) ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); } - public ReadSourceDescriptors withKeyDeserializerProvider( - @Nullable DeserializerProvider deserializerProvider) { - return toBuilder().setKeyDeserializerProvider(deserializerProvider).build(); - } - - public ReadSourceDescriptors withValueDeserializerProvider( - @Nullable DeserializerProvider deserializerProvider) { - return toBuilder().setValueDeserializerProvider(deserializerProvider).build(); - } - /** * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka. * @@ -2386,6 +2378,31 @@ public ReadSourceDescriptors withKeyDeserializer( return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer)); } + /** + * Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a + * {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary. + * + *

Use this method to override the coder inference performed within {@link + * #withKeyDeserializer(Class)}. + */ + public ReadSourceDescriptors withKeyDeserializerAndCoder( + Class> keyDeserializer, Coder keyCoder) { + return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build(); + } + + public ReadSourceDescriptors withKeyDeserializerProvider( + @Nullable DeserializerProvider deserializerProvider) { + return toBuilder().setKeyDeserializerProvider(deserializerProvider).build(); + } + + public ReadSourceDescriptors withKeyDeserializerProviderAndCoder( + @Nullable DeserializerProvider deserializerProvider, Coder keyCoder) { + return toBuilder() + .setKeyDeserializerProvider(deserializerProvider) + .setKeyCoder(keyCoder) + .build(); + } + /** * Sets a Kafka {@link Deserializer} to interpret value bytes read from Kafka. * @@ -2399,18 +2416,6 @@ public ReadSourceDescriptors withValueDeserializer( return withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer)); } - /** - * Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a - * {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary. - * - *

Use this method to override the coder inference performed within {@link - * #withKeyDeserializer(Class)}. - */ - public ReadSourceDescriptors withKeyDeserializerAndCoder( - Class> keyDeserializer, Coder keyCoder) { - return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build(); - } - /** * Sets a Kafka {@link Deserializer} for interpreting value bytes read from Kafka along with a * {@link Coder} for helping the Beam runner materialize value objects at runtime if necessary. @@ -2423,6 +2428,19 @@ public ReadSourceDescriptors withValueDeserializerAndCoder( return withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build(); } + public ReadSourceDescriptors withValueDeserializerProvider( + @Nullable DeserializerProvider deserializerProvider) { + return toBuilder().setValueDeserializerProvider(deserializerProvider).build(); + } + + public ReadSourceDescriptors withValueDeserializerProviderAndCoder( + @Nullable DeserializerProvider deserializerProvider, Coder valueCoder) { + return toBuilder() + .setValueDeserializerProvider(deserializerProvider) + .setValueCoder(valueCoder) + .build(); + } + /** * A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for * supporting another version of Kafka consumer. Default is {@link KafkaConsumer}. diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 7455d5270969..6d0a2352bea2 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assume.assumeFalse; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.time.Instant; import java.util.Collection; @@ -41,6 +42,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; @@ -100,8 +102,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.AppInfoParser; @@ -1117,4 +1121,157 @@ private static void setupKafkaContainer() { kafkaContainer.start(); options.setKafkaBootstrapServerAddresses(kafkaContainer.getBootstrapServers()); } + + @Test + public void testCustomRowDeserializerWithViaSDF() throws IOException { + // This test verifies that the SDF implementation of KafkaIO correctly handles + // custom deserializers with explicit coders. It uses a Row deserializer which + // requires an explicit coder to be provided since Beam cannot infer one automatically. + // The test ensures that both the deserializer and coder are properly passed to + // the ReadSourceDescriptors transform. + + // Create a simple Row schema for test + Schema testSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build(); + + RowCoder rowCoder = RowCoder.of(testSchema); + + // Set up sample data + String testId = UUID.randomUUID().toString(); + String topicName = options.getKafkaTopic() + "-row-deserializer-" + testId; + + // Create test data + Map testData = new HashMap<>(); + for (int i = 0; i < 5; i++) { + testData.put( + "key" + i, + Row.withSchema(testSchema) + .withFieldValue("field1", "value" + i) + .withFieldValue("field2", i) + .build()); + } + + // Write the test data to Kafka using a custom serializer + writePipeline + .apply("Create test data", Create.of(testData)) + .apply( + "Write to Kafka", + KafkaIO.write() + .withBootstrapServers(options.getKafkaBootstrapServerAddresses()) + .withTopic(topicName) + .withKeySerializer(StringSerializer.class) + .withValueSerializer(RowSerializer.class) + .withProducerConfigUpdates(ImmutableMap.of("test.schema", testSchema.toString()))); + + PipelineResult writeResult = writePipeline.run(); + writeResult.waitUntilFinish(); + + // Read the data using SDF-based KafkaIO with a custom deserializer + PCollection> resultSDF = + sdfReadPipeline.apply( + "Read from Kafka via SDF", + KafkaIO.read() + .withBootstrapServers(options.getKafkaBootstrapServerAddresses()) + .withTopic(topicName) + .withConsumerConfigUpdates( + ImmutableMap.of( + "auto.offset.reset", "earliest", "test.schema", testSchema.toString())) + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializerAndCoder(RowDeserializer.class, rowCoder) + .withoutMetadata()); + + // Compare with the original data + PAssert.that(resultSDF) + .containsInAnyOrder( + testData.entrySet().stream() + .map(entry -> KV.of(entry.getKey(), entry.getValue())) + .collect(Collectors.toList())); + + // Run and verify + PipelineResult resultSDFResult = sdfReadPipeline.run(); + PipelineResult.State resultSDFState = + resultSDFResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout())); + cancelIfTimeouted(resultSDFResult, resultSDFState); + assertNotEquals(PipelineResult.State.FAILED, resultSDFState); + + // Clean up + tearDownTopic(topicName); + } + + /** A custom serializer for Row objects. */ + public static class RowSerializer implements Serializer { + private Schema schema; + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public void configure(Map configs, boolean isKey) { + String schemaString = (String) configs.get("test.schema"); + if (schemaString != null) { + // Use a more direct method to parse schema from string + try { + this.schema = Schema.builder().addStringField("field1").addInt32Field("field2").build(); + } catch (Exception e) { + throw new RuntimeException("Error parsing schema", e); + } + } + } + + @Override + public byte[] serialize(String topic, Row data) { + if (data == null) { + return null; + } + // Simple JSON serialization for test purposes + try { + // Ensure we're using the schema + if (schema != null && !schema.equals(data.getSchema())) { + throw new RuntimeException("Schema mismatch: " + schema + " vs " + data.getSchema()); + } + return mapper.writeValueAsBytes(data.getValues()); + } catch (Exception e) { + throw new RuntimeException("Error serializing Row to JSON", e); + } + } + + @Override + public void close() {} + } + + /** A custom deserializer for Row objects. */ + public static class RowDeserializer implements Deserializer { + private Schema schema; + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public void configure(Map configs, boolean isKey) { + String schemaString = (String) configs.get("test.schema"); + if (schemaString != null) { + // Use a more direct method to parse schema from string + try { + this.schema = Schema.builder().addStringField("field1").addInt32Field("field2").build(); + } catch (Exception e) { + throw new RuntimeException("Error parsing schema", e); + } + } + } + + @Override + public Row deserialize(String topic, byte[] data) { + if (data == null || schema == null) { + return null; + } + // Simple JSON deserialization for test purposes + try { + Object[] values = mapper.readValue(data, Object[].class); + return Row.withSchema(schema) + .withFieldValue("field1", values[0].toString()) + .withFieldValue("field2", Integer.parseInt(values[1].toString())) + .build(); + } catch (Exception e) { + throw new RuntimeException("Error deserializing JSON to Row", e); + } + } + + @Override + public void close() {} + } }