From ce75c4f5bee2424dd8c24da90e9913d3daeb7f16 Mon Sep 17 00:00:00 2001 From: steve Date: Tue, 21 Apr 2020 11:10:59 -0400 Subject: [PATCH] [BEAM-9795] Support custom avro DatumWriters when writing to BigQuery --- .../sdk/io/gcp/bigquery/AvroRowWriter.java | 15 ++-- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 75 ++++++++++++++----- .../sdk/io/gcp/bigquery/RowWriterFactory.java | 29 ++++--- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 65 +++++++++++++++- 4 files changed, 145 insertions(+), 39 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java index a0509a6ab030..74a0bb44532d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java @@ -20,28 +20,27 @@ import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.MimeTypes; -class AvroRowWriter extends BigQueryRowWriter { - private final DataFileWriter writer; +class AvroRowWriter extends BigQueryRowWriter { + private final DataFileWriter writer; private final Schema schema; - private final SerializableFunction, GenericRecord> toAvroRecord; + private final SerializableFunction, AvroT> toAvroRecord; AvroRowWriter( String basename, Schema schema, - SerializableFunction, GenericRecord> toAvroRecord) + SerializableFunction, AvroT> toAvroRecord, + SerializableFunction> writerFactory) throws Exception { super(basename, MimeTypes.BINARY); this.schema = schema; this.toAvroRecord = toAvroRecord; this.writer = - new DataFileWriter(new GenericDatumWriter<>()) - .create(schema, getOutputStream()); + new DataFileWriter<>(writerFactory.apply(schema)).create(schema, getOutputStream()); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index b0399cad9f94..24667e5c7472 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -51,7 +51,9 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Experimental; @@ -294,14 +296,16 @@ *
    *
  • {@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} (recommended) to * write data using avro records. + *
  • {@link BigQueryIO.Write#withAvroWriter} to write avro data using a user-specified {@link + * DatumWriter} (and format function). *
  • {@link BigQueryIO.Write#withFormatFunction(SerializableFunction)} to write data as json * encoded {@link TableRow TableRows}. *
* - * If {@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} is used, the table - * schema MUST be specified using one of the {@link Write#withJsonSchema(String)}, {@link - * Write#withJsonSchema(ValueProvider)}, {@link Write#withSchemaFromView(PCollectionView)} methods, - * or {@link Write#to(DynamicDestinations)}. + * If {@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} or {@link + * BigQueryIO.Write#withAvroWriter} is used, the table schema MUST be specified using one of the + * {@link Write#withJsonSchema(String)}, {@link Write#withJsonSchema(ValueProvider)}, {@link + * Write#withSchemaFromView(PCollectionView)} methods, or {@link Write#to(DynamicDestinations)}. * *
{@code
  * class Quote {
@@ -488,6 +492,9 @@ public class BigQueryIO {
    */
   static final SerializableFunction IDENTITY_FORMATTER = input -> input;
 
+  static final SerializableFunction>
+      GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>();
+
   private static final SerializableFunction
       DEFAULT_AVRO_SCHEMA_FACTORY =
           new SerializableFunction() {
@@ -1763,7 +1770,7 @@ public enum Method {
     abstract SerializableFunction getFormatFunction();
 
     @Nullable
-    abstract SerializableFunction, GenericRecord> getAvroFormatFunction();
+    abstract RowWriterFactory.AvroRowWriterFactory getAvroRowWriterFactory();
 
     @Nullable
     abstract SerializableFunction getAvroSchemaFactory();
@@ -1851,8 +1858,8 @@ abstract Builder setTableFunction(
 
       abstract Builder setFormatFunction(SerializableFunction formatFunction);
 
-      abstract Builder setAvroFormatFunction(
-          SerializableFunction, GenericRecord> avroFormatFunction);
+      abstract Builder setAvroRowWriterFactory(
+          RowWriterFactory.AvroRowWriterFactory avroRowWriterFactory);
 
       abstract Builder setAvroSchemaFactory(
           SerializableFunction avroSchemaFactory);
@@ -2056,13 +2063,43 @@ public Write withFormatFunction(SerializableFunction formatFunct
     }
 
     /**
-     * Formats the user's type into a {@link GenericRecord} to be written to BigQuery.
+     * Formats the user's type into a {@link GenericRecord} to be written to BigQuery. The
+     * GenericRecords are written as avro using the standard {@link GenericDatumWriter}.
      *
      * 

This is mutually exclusive with {@link #withFormatFunction}, only one may be set. */ public Write withAvroFormatFunction( SerializableFunction, GenericRecord> avroFormatFunction) { - return toBuilder().setAvroFormatFunction(avroFormatFunction).setOptimizeWrites(true).build(); + return withAvroWriter(avroFormatFunction, GENERIC_DATUM_WRITER_FACTORY); + } + + /** + * Writes the user's type as avro using the supplied {@link DatumWriter}. + * + *

This is mutually exclusive with {@link #withFormatFunction}, only one may be set. + * + *

Overwrites {@link #withAvroFormatFunction} if it has been set. + */ + public Write withAvroWriter( + SerializableFunction> writerFactory) { + return withAvroWriter(AvroWriteRequest::getElement, writerFactory); + } + + /** + * Convert's the user's type to an avro record using the supplied avroFormatFunction. Records + * are then written using the supplied writer instances returned from writerFactory. + * + *

This is mutually exclusive with {@link #withFormatFunction}, only one may be set. + * + *

Overwrites {@link #withAvroFormatFunction} if it has been set. + */ + public Write withAvroWriter( + SerializableFunction, AvroT> avroFormatFunction, + SerializableFunction> writerFactory) { + return toBuilder() + .setOptimizeWrites(true) + .setAvroRowWriterFactory(RowWriterFactory.avroRecords(avroFormatFunction, writerFactory)) + .build(); } /** @@ -2484,7 +2521,7 @@ public WriteResult expand(PCollection input) { if (method != Method.FILE_LOADS) { // we only support writing avro for FILE_LOADS checkArgument( - getAvroFormatFunction() == null, + getAvroRowWriterFactory() == null, "Writing avro formatted data is only supported for FILE_LOADS, however " + "the method was %s", method); @@ -2546,8 +2583,8 @@ private WriteResult expandTyped( PCollection input, DynamicDestinations dynamicDestinations) { boolean optimizeWrites = getOptimizeWrites(); SerializableFunction formatFunction = getFormatFunction(); - SerializableFunction, GenericRecord> avroFormatFunction = - getAvroFormatFunction(); + RowWriterFactory.AvroRowWriterFactory avroRowWriterFactory = + (RowWriterFactory.AvroRowWriterFactory) getAvroRowWriterFactory(); boolean hasSchema = getJsonSchema() != null @@ -2559,8 +2596,8 @@ private WriteResult expandTyped( optimizeWrites = true; checkArgument( - avroFormatFunction == null, - "avroFormatFunction is unsupported when using Beam schemas."); + avroRowWriterFactory == null, + "avro avroFormatFunction is unsupported when using Beam schemas."); if (formatFunction == null) { // If no format function set, then we will automatically convert the input type to a @@ -2593,10 +2630,10 @@ private WriteResult expandTyped( Method method = resolveMethod(input); if (optimizeWrites) { RowWriterFactory rowWriterFactory; - if (avroFormatFunction != null) { + if (avroRowWriterFactory != null) { checkArgument( formatFunction == null, - "Only one of withFormatFunction or withAvroFormatFunction maybe set, not both."); + "Only one of withFormatFunction or withAvroFormatFunction/withAvroWriter maybe set, not both."); SerializableFunction avroSchemaFactory = getAvroSchemaFactory(); @@ -2607,9 +2644,7 @@ private WriteResult expandTyped( + "is set but no avroSchemaFactory is defined."); avroSchemaFactory = DEFAULT_AVRO_SCHEMA_FACTORY; } - rowWriterFactory = - RowWriterFactory.avroRecords( - avroFormatFunction, avroSchemaFactory, dynamicDestinations); + rowWriterFactory = avroRowWriterFactory.prepare(dynamicDestinations, avroSchemaFactory); } else if (formatFunction != null) { rowWriterFactory = RowWriterFactory.tableRows(formatFunction); } else { @@ -2634,7 +2669,7 @@ private WriteResult expandTyped( rowWriterFactory, method); } else { - checkArgument(avroFormatFunction == null); + checkArgument(avroRowWriterFactory == null); checkArgument( formatFunction != null, "A function must be provided to convert the input type into a TableRow or " diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java index d8e4ea6b29cf..7229957582ef 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java @@ -21,7 +21,7 @@ import com.google.api.services.bigquery.model.TableSchema; import java.io.Serializable; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; import org.apache.beam.sdk.transforms.SerializableFunction; abstract class RowWriterFactory implements Serializable { @@ -74,29 +74,38 @@ String getSourceFormat() { } } - static RowWriterFactory avroRecords( - SerializableFunction, GenericRecord> toAvro, - SerializableFunction schemaFactory, - DynamicDestinations dynamicDestinations) { - return new AvroRowWriterFactory<>(toAvro, schemaFactory, dynamicDestinations); + static + AvroRowWriterFactory avroRecords( + SerializableFunction, AvroT> toAvro, + SerializableFunction> writerFactory) { + return new AvroRowWriterFactory<>(toAvro, writerFactory, null, null); } - private static final class AvroRowWriterFactory + static final class AvroRowWriterFactory extends RowWriterFactory { - private final SerializableFunction, GenericRecord> toAvro; + private final SerializableFunction, AvroT> toAvro; + private final SerializableFunction> writerFactory; private final SerializableFunction schemaFactory; private final DynamicDestinations dynamicDestinations; private AvroRowWriterFactory( - SerializableFunction, GenericRecord> toAvro, + SerializableFunction, AvroT> toAvro, + SerializableFunction> writerFactory, SerializableFunction schemaFactory, DynamicDestinations dynamicDestinations) { this.toAvro = toAvro; + this.writerFactory = writerFactory; this.schemaFactory = schemaFactory; this.dynamicDestinations = dynamicDestinations; } + AvroRowWriterFactory prepare( + DynamicDestinations dynamicDestinations, + SerializableFunction schemaFactory) { + return new AvroRowWriterFactory<>(toAvro, writerFactory, schemaFactory, dynamicDestinations); + } + @Override OutputType getOutputType() { return OutputType.AvroGenericRecord; @@ -107,7 +116,7 @@ BigQueryRowWriter createRowWriter(String tempFilePrefix, DestinationT throws Exception { TableSchema tableSchema = dynamicDestinations.getSchema(destination); Schema avroSchema = schemaFactory.apply(tableSchema); - return new AvroRowWriter<>(tempFilePrefix, avroSchema, toAvro); + return new AvroRowWriter<>(tempFilePrefix, avroSchema, toAvro, writerFactory); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 0e2064cb5d4b..f3b966293d31 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -65,7 +65,10 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -782,6 +785,66 @@ public void testWriteAvro() throws Exception { .set("instantVal", "2019-02-01 00:00:00 UTC"))); } + @Test + public void testWriteAvroWithCustomWriter() throws Exception { + SerializableFunction, GenericRecord> formatFunction = + r -> { + GenericRecord rec = new GenericData.Record(r.getSchema()); + InputRecord i = r.getElement(); + rec.put("strVal", i.strVal()); + rec.put("longVal", i.longVal()); + rec.put("doubleVal", i.doubleVal()); + rec.put("instantVal", i.instantVal().getMillis() * 1000); + return rec; + }; + + SerializableFunction> customWriterFactory = + s -> + new GenericDatumWriter() { + @Override + protected void writeString(org.apache.avro.Schema schema, Object datum, Encoder out) + throws IOException { + super.writeString(schema, datum.toString() + "_custom", out); + } + }; + + p.apply( + Create.of( + InputRecord.create("test", 1, 1.0, Instant.parse("2019-01-01T00:00:00Z")), + InputRecord.create("test2", 2, 2.0, Instant.parse("2019-02-01T00:00:00Z"))) + .withCoder(INPUT_RECORD_CODER)) + .apply( + BigQueryIO.write() + .to("dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("strVal").setType("STRING"), + new TableFieldSchema().setName("longVal").setType("INTEGER"), + new TableFieldSchema().setName("doubleVal").setType("FLOAT"), + new TableFieldSchema().setName("instantVal").setType("TIMESTAMP")))) + .withTestServices(fakeBqServices) + .withAvroWriter(formatFunction, customWriterFactory) + .withoutValidation()); + p.run(); + + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"), + containsInAnyOrder( + new TableRow() + .set("strVal", "test_custom") + .set("longVal", "1") + .set("doubleVal", 1.0D) + .set("instantVal", "2019-01-01 00:00:00 UTC"), + new TableRow() + .set("strVal", "test2_custom") + .set("longVal", "2") + .set("doubleVal", 2.0D) + .set("instantVal", "2019-02-01 00:00:00 UTC"))); + } + @Test public void testStreamingWrite() throws Exception { p.apply( @@ -1352,7 +1415,7 @@ public void testWriteValidateFailsBothFormatFunctions() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage( - "Only one of withFormatFunction or withAvroFormatFunction maybe set, not both"); + "Only one of withFormatFunction or withAvroFormatFunction/withAvroWriter maybe set, not both."); p.apply(Create.empty(INPUT_RECORD_CODER)) .apply( BigQueryIO.write()