-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Closed
Milestone
Description
What happened?
On beam 2.63.0, when writing GenericRecords to BQ using time-millis logical types over the Java SDK, a different value is written depending on whether I use STORAGE_WRITE_API method or FILE_LOADS method.
Repro setup (sorry for Scio code, will work on reproducing in a Beam as well):
object BqWriteTest {
val AvroSchema = new Schema.Parser().parse(
s"""
|{
| "type": "record",
| "namespace": "com.spotify.scio.extra.bigquery",
| "name": "AvroExampleWithLogicalType",
| "fields": [
| {"name": "timeMillisField", "type": { "type": "int", "logicalType": "time-millis"}},
| {"name": "timeMicrosField", "type": { "type": "long", "logicalType": "time-micros"}}
| ]
|}
|""".stripMargin
)
val TableSchema = new TableSchema().setFields(
List(
new TableFieldSchema().setName("timeMillisField").setType("TIME").setMode("REQUIRED"),
new TableFieldSchema().setName("timeMicrosField").setType("TIME").setMode("REQUIRED")
).asJava
)
val TestRecord: GenericRecord = new GenericRecordBuilder(AvroSchema)
.set(
"timeMillisField",
new TimeMillisConversion().toInt(java.time.LocalTime.of(6, 30, 10, 0), AvroSchema, LogicalTypes.timeMillis()))
.set(
"timeMicrosField",
new TimeMicrosConversion().toLong(java.time.LocalTime.of(6, 30, 10, 0), AvroSchema, LogicalTypes.timeMicros()))
.build()
def main(cmdLineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdLineArgs)
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(AvroSchema)
val records = sc.parallelize(Seq(TestRecord))
// Save to BQ using Storage Write
records.saveAsCustomOutput(
"Write to BQ using Storage Write API",
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
.write[GenericRecord]()
.to(s"$project:$dataset.test_bq_file_loads_api")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.useAvroLogicalTypes()
.withSchema(TableSchema)
.withAvroFormatFunction(Functions.serializableFn(_.getElement))
.withAvroSchemaFactory(Functions.serializableFn(BigQueryUtils.toGenericAvroSchema(_, true)))
.withMethod(org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.FILE_LOADS)
)
// Save to BQ using File Loads
records.saveAsCustomOutput(
"Write to BQ using Storage Write API",
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
.write[GenericRecord]()
.to(s"$project:$dataset.test_bq_storage_write_api")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.useAvroLogicalTypes()
.withSchema(TableSchema)
.withAvroFormatFunction(Functions.serializableFn(_.getElement))
.withAvroSchemaFactory(Functions.serializableFn(BigQueryUtils.toGenericAvroSchema(_, true)))
.withMethod(org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.STORAGE_WRITE_API)
)
sc.run()
}
} The time-millis value for the table written using FILE_LOADS does not match the value written using STORAGE_WRITE_API:
Tbh, the only value that looks correct is the file-loads micros value 😬 I'm extremely concerned about this as potentially incorrect data is being written using BQ's Avro interface.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner

