Skip to content

[Bug]: ReadFromKafkaViaSDF does not set coder when using withValueDeserializerAndCoder #34506

@yyfhust

Description

@yyfhust

What happened?

encountered a bug when using KafkaIO with a customized Deserializer that extends Deserializer<Row>. We built this Deserializer to deserialize byte arrays into Beam Rows with a schema we specified.

When using KafkaIO, we use withValueDeserializerAndCoder.

It runs without any issues with legacy Kafka IO ReadFromKafkaViaUnbounded. However, it fails when using ReadFromKafkaViaSDF. The error originates here.

Screenshot 2025-04-01 at 9 05 09 PM

This happens because currently ReadFromKafkaViaSDF does not set the coder even if we explicitly provide both the deserializer and the coder using withValueDeserializerAndCoder. Since no coder is explicitly set, Beam infers the type from the deserializer (

private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
return (getValueCoder() != null)
? getValueCoder()
: Preconditions.checkStateNotNull(getValueDeserializerProvider()).getCoder(coderRegistry);
). This is typically not an issue when using Beam's built-in deserializers:

Screenshot 2025-04-01 at 11 12 48 PM

However, if we use a customized deserializer, such as foo_bar_Deserializer implements Deserializer<Row>, Beam will be unable to infer the coder and will throw an error.

  • Legacy KafkaIO (ReadFromKafkaViaUnbounded) sets both the deserializer and the coder based on the input.
  • KafkaIO implemented using SDF (ReadFromKafkaViaSDF) currently does not set the coder explicitly. It does not pass the coder and use it, instead relying on inferring the coder from the Deserializer, which will throw an error as there is no coder for Row in the registry for obvious reasons.

To resolve this issue, we need to explicitly set the coder based on the input.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions