Skip to content

[Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource #26329

@piter75

Description

@piter75

What happened?

Since #22718 our Spotify's Scio based streaming pipelines on Google Cloud Dataflow are failing with the AvroCodec exception while reading data from BigQuery (with TypedRead).
The last released version of Beam that works properly is 2.42.0 and we cannot upgrade some of our pipelines further because of the issue.

We are reading GenericRecords from temporary BigQuery table and apply parseFn function to it to create arbitrary (non-avro) types which is effectively the Case 3 from the table described in the AvroSource.Mode.

// pseudo code

BigQueryIO
        .read(SerializableFunction[SchemaAndRecord, T] parseFn)
        .withCoder(Coder[T] coder)

I analysed the issue. It is complex but the gist of it is that:

  1. Support custom avro DatumReader when reading from BigQuery #22718 adds the ability to use a custom AvroSource.DatumReaderFactory implementation for reading from BigQuery;
  2. it creates its "default" / "backwards compatibility" implementation and uses it in BigQueryIO.read;
  3. this "default" implementation is in fact using the parseFn function (supplied to BigQueryIO.read) to actually return the parsed type from custom DatumReader;
  4. however, it does not (and cannot) propagate the output Coder to the AvroSource used for reading the data;
  5. Dataflow (in the streaming mode) is wrapping the AvroSource in UnboundedReadFromBoundedSource wrapper to use it as UnboundedSource;
  6. on the way it tries to get the output Coder from the underlying AvroSource to use it as CheckpointCoder for checkpointing;
  7. AvroSource does not have a clue about parseFn being actually used and it returns the AvroCoder instance which of course cannot encode arbitrary (non-avro) types

The biggest issue I see is that the contract between using parseFn in the process and supplying the output Coder that AvroSource enforces is broken by moving the responsibility of applying the parseFn into GenericDatumTransformer.

I am thinking about contributing a fix and I am pondering on the following solution:

  1. removal of the BigQueryIO.GenericDatumTransformer
  2. bringing back the parseFn to BigQueryBaseSource hierarchy
  3. simplifying the datumReaderFactory type to AvroSource.DatumReaderFactory<T> and stop applying parseFn in it
  4. adding validation that only one of parseFn or datumReaderFactory is used - I believe that the purpose of custom DatumReader is to actually read SpecificRecords and output them without the need for additional parsing.
  5. creating AvroSources accordingly to which param was actually provided in BigQuerySourceBase.createSources

This will of course add more complexity to the already complex process but will keep the backwards compatibility in more scenarios.

CC: @steveniemitz @kkdoon

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions