Skip to content
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed read Beam rows from cross-lang transform (for example, ReadFromJdbc) involving negative 32-bit integers incorrectly decoded to large integers ([#34089](https://github.com/apache/beam/issues/34089))
* (Java) Fixed SDF-based KafkaIO (ReadFromKafkaViaSDF) to properly handle custom deserializers that extend Deserializer<Row> interface([#34505](https://github.com/apache/beam/pull/34505))

## Security Fixes
* Fixed [CVE-YYYY-NNNN](https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN) (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1784,8 +1784,10 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
.withConsumerConfigOverrides(kafkaRead.getConsumerConfig())
.withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig())
.withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn())
.withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider())
.withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider())
.withKeyDeserializerProviderAndCoder(
kafkaRead.getKeyDeserializerProvider(), keyCoder)
.withValueDeserializerProviderAndCoder(
kafkaRead.getValueDeserializerProvider(), valueCoder)
Comment on lines +1787 to +1790
Copy link
Contributor Author

@yyfhust yyfhust Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keyCoder and valueCoder must be non-nullable. they are resolved here

Coder<K> keyCoder = getKeyCoder(coderRegistry);
Coder<V> valueCoder = getValueCoder(coderRegistry);

either from user input or infer from coderegistry

Copy link
Contributor Author

@yyfhust yyfhust Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is at the entry of kafka IO, and it will 100% return a coder :

private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
return (getValueCoder() != null)
? getValueCoder()
: Preconditions.checkStateNotNull(getValueDeserializerProvider()).getCoder(coderRegistry);

if coder is given, it will return the coder specified by user. If not, it will attempt to retrieve a coder from registry , which only has coder for build-in deserializer.

.withManualWatermarkEstimator()
.withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory())
.withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn())
Expand Down Expand Up @@ -2363,16 +2365,6 @@ public ReadSourceDescriptors<K, V> withBootstrapServers(String bootstrapServers)
ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
}

public ReadSourceDescriptors<K, V> withKeyDeserializerProvider(
@Nullable DeserializerProvider<K> deserializerProvider) {
return toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
}

public ReadSourceDescriptors<K, V> withValueDeserializerProvider(
@Nullable DeserializerProvider<V> deserializerProvider) {
return toBuilder().setValueDeserializerProvider(deserializerProvider).build();
}

/**
* Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka.
*
Expand All @@ -2386,6 +2378,31 @@ public ReadSourceDescriptors<K, V> withKeyDeserializer(
return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
}

/**
* Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a
* {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary.
*
* <p>Use this method to override the coder inference performed within {@link
* #withKeyDeserializer(Class)}.
*/
public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
}

public ReadSourceDescriptors<K, V> withKeyDeserializerProvider(
@Nullable DeserializerProvider<K> deserializerProvider) {
return toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
}

public ReadSourceDescriptors<K, V> withKeyDeserializerProviderAndCoder(
@Nullable DeserializerProvider<K> deserializerProvider, Coder<K> keyCoder) {
return toBuilder()
.setKeyDeserializerProvider(deserializerProvider)
.setKeyCoder(keyCoder)
.build();
}

/**
* Sets a Kafka {@link Deserializer} to interpret value bytes read from Kafka.
*
Expand All @@ -2399,18 +2416,6 @@ public ReadSourceDescriptors<K, V> withValueDeserializer(
return withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
}

/**
* Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a
* {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary.
*
* <p>Use this method to override the coder inference performed within {@link
* #withKeyDeserializer(Class)}.
*/
public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
}

/**
* Sets a Kafka {@link Deserializer} for interpreting value bytes read from Kafka along with a
* {@link Coder} for helping the Beam runner materialize value objects at runtime if necessary.
Expand All @@ -2423,6 +2428,19 @@ public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(
return withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
}

public ReadSourceDescriptors<K, V> withValueDeserializerProvider(
@Nullable DeserializerProvider<V> deserializerProvider) {
return toBuilder().setValueDeserializerProvider(deserializerProvider).build();
}

public ReadSourceDescriptors<K, V> withValueDeserializerProviderAndCoder(
@Nullable DeserializerProvider<V> deserializerProvider, Coder<V> valueCoder) {
return toBuilder()
.setValueDeserializerProvider(deserializerProvider)
.setValueCoder(valueCoder)
.build();
}

/**
* A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for
* supporting another version of Kafka consumer. Default is {@link KafkaConsumer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assume.assumeFalse;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
Expand All @@ -41,6 +42,7 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
Expand Down Expand Up @@ -100,8 +102,10 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
Expand Down Expand Up @@ -1117,4 +1121,157 @@ private static void setupKafkaContainer() {
kafkaContainer.start();
options.setKafkaBootstrapServerAddresses(kafkaContainer.getBootstrapServers());
}

@Test
public void testCustomRowDeserializerWithViaSDF() throws IOException {
// This test verifies that the SDF implementation of KafkaIO correctly handles
// custom deserializers with explicit coders. It uses a Row deserializer which
// requires an explicit coder to be provided since Beam cannot infer one automatically.
// The test ensures that both the deserializer and coder are properly passed to
// the ReadSourceDescriptors transform.

// Create a simple Row schema for test
Schema testSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build();

RowCoder rowCoder = RowCoder.of(testSchema);

// Set up sample data
String testId = UUID.randomUUID().toString();
String topicName = options.getKafkaTopic() + "-row-deserializer-" + testId;

// Create test data
Map<String, Row> testData = new HashMap<>();
for (int i = 0; i < 5; i++) {
testData.put(
"key" + i,
Row.withSchema(testSchema)
.withFieldValue("field1", "value" + i)
.withFieldValue("field2", i)
.build());
}

// Write the test data to Kafka using a custom serializer
writePipeline
.apply("Create test data", Create.of(testData))
.apply(
"Write to Kafka",
KafkaIO.<String, Row>write()
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
.withTopic(topicName)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(RowSerializer.class)
.withProducerConfigUpdates(ImmutableMap.of("test.schema", testSchema.toString())));

PipelineResult writeResult = writePipeline.run();
writeResult.waitUntilFinish();

// Read the data using SDF-based KafkaIO with a custom deserializer
PCollection<KV<String, Row>> resultSDF =
sdfReadPipeline.apply(
"Read from Kafka via SDF",
KafkaIO.<String, Row>read()
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
.withTopic(topicName)
.withConsumerConfigUpdates(
ImmutableMap.of(
"auto.offset.reset", "earliest", "test.schema", testSchema.toString()))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(RowDeserializer.class, rowCoder)
.withoutMetadata());

// Compare with the original data
PAssert.that(resultSDF)
.containsInAnyOrder(
testData.entrySet().stream()
.map(entry -> KV.of(entry.getKey(), entry.getValue()))
.collect(Collectors.toList()));

// Run and verify
PipelineResult resultSDFResult = sdfReadPipeline.run();
PipelineResult.State resultSDFState =
resultSDFResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
cancelIfTimeouted(resultSDFResult, resultSDFState);
assertNotEquals(PipelineResult.State.FAILED, resultSDFState);

// Clean up
tearDownTopic(topicName);
}

/** A custom serializer for Row objects. */
public static class RowSerializer implements Serializer<Row> {
private Schema schema;
private final ObjectMapper mapper = new ObjectMapper();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String schemaString = (String) configs.get("test.schema");
if (schemaString != null) {
// Use a more direct method to parse schema from string
try {
this.schema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
} catch (Exception e) {
throw new RuntimeException("Error parsing schema", e);
}
}
}

@Override
public byte[] serialize(String topic, Row data) {
if (data == null) {
return null;
}
// Simple JSON serialization for test purposes
try {
// Ensure we're using the schema
if (schema != null && !schema.equals(data.getSchema())) {
throw new RuntimeException("Schema mismatch: " + schema + " vs " + data.getSchema());
}
return mapper.writeValueAsBytes(data.getValues());
} catch (Exception e) {
throw new RuntimeException("Error serializing Row to JSON", e);
}
}

@Override
public void close() {}
}

/** A custom deserializer for Row objects. */
public static class RowDeserializer implements Deserializer<Row> {
private Schema schema;
private final ObjectMapper mapper = new ObjectMapper();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String schemaString = (String) configs.get("test.schema");
if (schemaString != null) {
// Use a more direct method to parse schema from string
try {
this.schema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
} catch (Exception e) {
throw new RuntimeException("Error parsing schema", e);
}
}
}

@Override
public Row deserialize(String topic, byte[] data) {
if (data == null || schema == null) {
return null;
}
// Simple JSON deserialization for test purposes
try {
Object[] values = mapper.readValue(data, Object[].class);
return Row.withSchema(schema)
.withFieldValue("field1", values[0].toString())
.withFieldValue("field2", Integer.parseInt(values[1].toString()))
.build();
} catch (Exception e) {
throw new RuntimeException("Error deserializing JSON to Row", e);
}
}

@Override
public void close() {}
}
}
Loading