diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 8aad6348fbad..a45728236ab2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -147,10 +147,14 @@ public class AvroUtils { GenericData.get().addLogicalTypeConversion(new JodaTimestampConversion()); } - // Unwrap an AVRO schema into the base type an whether it is nullable. - static class TypeWithNullability { - public final org.apache.avro.Schema type; - public final boolean nullable; + /** Unwrap an AVRO schema into the base type an whether it is nullable. */ + public static class TypeWithNullability { + final org.apache.avro.Schema type; + final boolean nullable; + + public static TypeWithNullability create(org.apache.avro.Schema avroSchema) { + return new TypeWithNullability(avroSchema); + } TypeWithNullability(org.apache.avro.Schema avroSchema) { if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) { @@ -182,6 +186,14 @@ static class TypeWithNullability { nullable = false; } } + + public Boolean isNullable() { + return nullable; + } + + public org.apache.avro.Schema getType() { + return type; + } } /** Wrapper for fixed byte fields. */ diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index ef014af53035..fcd8c11089b8 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -152,10 +152,14 @@ public class AvroUtils { } // Unwrap an AVRO schema into the base type an whether it is nullable. - static class TypeWithNullability { + public static class TypeWithNullability { public final org.apache.avro.Schema type; public final boolean nullable; + public static TypeWithNullability create(org.apache.avro.Schema avroSchema) { + return new TypeWithNullability(avroSchema); + } + TypeWithNullability(org.apache.avro.Schema avroSchema) { if (avroSchema.getType() == Type.UNION) { List types = avroSchema.getTypes(); @@ -184,6 +188,14 @@ static class TypeWithNullability { nullable = false; } } + + public Boolean isNullable() { + return nullable; + } + + public org.apache.avro.Schema getType() { + return type; + } } /** Wrapper for fixed byte fields. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java new file mode 100644 index 000000000000..7becffa6d17f --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.DynamicMessage; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Days; +import org.joda.time.Instant; +import org.joda.time.ReadableInstant; + +/** + * Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message, + * for use with the Storage write API. + */ +public class AvroGenericRecordToStorageApiProto { + + static final Map PRIMITIVE_TYPES = + ImmutableMap.builder() + .put(Schema.Type.INT, TableFieldSchema.Type.INT64) + .put(Schema.Type.FIXED, TableFieldSchema.Type.BYTES) + .put(Schema.Type.LONG, TableFieldSchema.Type.INT64) + .put(Schema.Type.FLOAT, TableFieldSchema.Type.DOUBLE) + .put(Schema.Type.DOUBLE, TableFieldSchema.Type.DOUBLE) + .put(Schema.Type.STRING, TableFieldSchema.Type.STRING) + .put(Schema.Type.BOOLEAN, TableFieldSchema.Type.BOOL) + .put(Schema.Type.ENUM, TableFieldSchema.Type.STRING) + .put(Schema.Type.BYTES, TableFieldSchema.Type.BYTES) + .build(); + + // A map of supported logical types to the protobuf field type. + static final Map LOGICAL_TYPES = + ImmutableMap.builder() + .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE) + .put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC) + .put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP) + .put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP) + .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING) + .build(); + + static final Map> PRIMITIVE_ENCODERS = + ImmutableMap.>builder() + .put(Schema.Type.INT, o -> Long.valueOf((int) o)) + .put(Schema.Type.FIXED, o -> ByteString.copyFrom(((GenericData.Fixed) o).bytes())) + .put(Schema.Type.LONG, Functions.identity()) + .put(Schema.Type.FLOAT, o -> Double.parseDouble(Float.valueOf((float) o).toString())) + .put(Schema.Type.DOUBLE, Function.identity()) + .put(Schema.Type.STRING, Function.identity()) + .put(Schema.Type.BOOLEAN, Function.identity()) + .put(Schema.Type.ENUM, o -> o.toString()) + .put(Schema.Type.BYTES, o -> ByteString.copyFrom((byte[]) o)) + .build(); + + // A map of supported logical types to their encoding functions. + static final Map> LOGICAL_TYPE_ENCODERS = + ImmutableMap.>builder() + .put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value)) + .put( + LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal) + .put( + LogicalTypes.timestampMicros().getName(), + (logicalType, value) -> convertTimestamp(value, true)) + .put( + LogicalTypes.timestampMillis().getName(), + (logicalType, value) -> convertTimestamp(value, false)) + .put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value)) + .build(); + + static String convertUUID(Object value) { + if (value instanceof UUID) { + return ((UUID) value).toString(); + } else { + Preconditions.checkArgument(value instanceof String, "Expecting a value as String type."); + UUID.fromString((String) value); + return (String) value; + } + } + + static Long convertTimestamp(Object value, boolean micros) { + if (value instanceof ReadableInstant) { + return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1); + } else { + Preconditions.checkArgument( + value instanceof Long, "Expecting a value as Long type (millis)."); + return (Long) value; + } + } + + static Integer convertDate(Object value) { + if (value instanceof ReadableInstant) { + return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays(); + } else { + Preconditions.checkArgument( + value instanceof Integer, "Expecting a value as Integer type (days)."); + return (Integer) value; + } + } + + static ByteString convertDecimal(LogicalType logicalType, Object value) { + ByteBuffer byteBuffer = (ByteBuffer) value; + BigDecimal bigDecimal = + new Conversions.DecimalConversion() + .fromBytes( + byteBuffer.duplicate(), + Schema.create(Schema.Type.NULL), // dummy schema, not used + logicalType); + return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal); + } + + /** + * Given an Avro Schema, returns a protocol-buffer TableSchema that can be used to write data + * through BigQuery Storage API. + * + * @param schema An Avro Schema + * @return Returns the TableSchema created from the provided Schema + */ + public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) { + Preconditions.checkState(!schema.getFields().isEmpty()); + + TableSchema.Builder builder = TableSchema.newBuilder(); + for (Schema.Field field : schema.getFields()) { + builder.addFields(fieldDescriptorFromAvroField(field)); + } + return builder.build(); + } + + /** + * Given an Avro {@link GenericRecord} object, returns a protocol-buffer message that can be used + * to write data using the BigQuery Storage streaming API. + * + * @param descriptor The Descriptor for the DynamicMessage result + * @param record An Avro GenericRecord + * @return A dynamic message representation of a Proto payload to be used for StorageWrite API + */ + public static DynamicMessage messageFromGenericRecord( + Descriptor descriptor, GenericRecord record) { + Schema schema = record.getSchema(); + DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); + for (Schema.Field field : schema.getFields()) { + FieldDescriptor fieldDescriptor = + Preconditions.checkNotNull(descriptor.findFieldByName(field.name().toLowerCase())); + @Nullable + Object value = + messageValueFromGenericRecordValue(fieldDescriptor, field, field.name(), record); + if (value != null) { + builder.setField(fieldDescriptor, value); + } + } + return builder.build(); + } + + private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) { + @Nullable Schema schema = field.schema(); + Preconditions.checkNotNull(schema, "Unexpected null schema!"); + TableFieldSchema.Builder builder = + TableFieldSchema.newBuilder().setName(field.name().toLowerCase()); + Schema elementType = null; + switch (schema.getType()) { + case RECORD: + Preconditions.checkState(!schema.getFields().isEmpty()); + builder = builder.setType(TableFieldSchema.Type.STRUCT); + for (Schema.Field recordField : schema.getFields()) { + builder = builder.addFields(fieldDescriptorFromAvroField(recordField)); + } + break; + case ARRAY: + elementType = TypeWithNullability.create(schema.getElementType()).getType(); + if (elementType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + Preconditions.checkState( + elementType.getType() != Schema.Type.ARRAY, "Nested arrays not supported by BigQuery."); + + TableFieldSchema elementFieldSchema = + fieldDescriptorFromAvroField( + new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal())); + builder = builder.setType(elementFieldSchema.getType()); + builder.addAllFields(elementFieldSchema.getFieldsList()); + builder = builder.setMode(TableFieldSchema.Mode.REPEATED); + break; + case MAP: + Schema keyType = Schema.create(Schema.Type.STRING); + Schema valueType = TypeWithNullability.create(schema.getElementType()).getType(); + if (valueType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + TableFieldSchema keyFieldSchema = + fieldDescriptorFromAvroField( + new Schema.Field("key", keyType, "key of the map entry", Schema.Field.NULL_VALUE)); + TableFieldSchema valueFieldSchema = + fieldDescriptorFromAvroField( + new Schema.Field( + "value", valueType, "value of the map entry", Schema.Field.NULL_VALUE)); + builder = + builder + .setType(TableFieldSchema.Type.STRUCT) + .addFields(keyFieldSchema) + .addFields(valueFieldSchema) + .setMode(TableFieldSchema.Mode.REPEATED); + break; + case UNION: + elementType = TypeWithNullability.create(schema).getType(); + if (elementType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + // check to see if more than one non-null type is defined in the union + Preconditions.checkState( + elementType.getType() != Schema.Type.UNION, + "Multiple non-null union types are not supported."); + TableFieldSchema unionFieldSchema = + fieldDescriptorFromAvroField( + new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal())); + builder = + builder + .setType(unionFieldSchema.getType()) + .addAllFields(unionFieldSchema.getFieldsList()); + break; + default: + elementType = TypeWithNullability.create(schema).getType(); + @Nullable + TableFieldSchema.Type primitiveType = + Optional.ofNullable(LogicalTypes.fromSchema(elementType)) + .map(logicalType -> LOGICAL_TYPES.get(logicalType.getName())) + .orElse(PRIMITIVE_TYPES.get(elementType.getType())); + if (primitiveType == null) { + throw new RuntimeException("Unsupported type " + elementType.getType()); + } + // a scalar will be required by default, if defined as part of union then + // caller will set nullability requirements + builder = builder.setType(primitiveType); + } + if (builder.getMode() != TableFieldSchema.Mode.REPEATED) { + if (TypeWithNullability.create(schema).isNullable()) { + builder = builder.setMode(TableFieldSchema.Mode.NULLABLE); + } else { + builder = builder.setMode(TableFieldSchema.Mode.REQUIRED); + } + } + if (field.doc() != null) { + builder = builder.setDescription(field.doc()); + } + return builder.build(); + } + + @Nullable + private static Object messageValueFromGenericRecordValue( + FieldDescriptor fieldDescriptor, Schema.Field avroField, String name, GenericRecord record) { + @Nullable Object value = record.get(name); + if (value == null) { + if (fieldDescriptor.isOptional()) { + return null; + } else { + throw new IllegalArgumentException( + "Received null value for non-nullable field " + fieldDescriptor.getName()); + } + } + return toProtoValue(fieldDescriptor, avroField.schema(), value); + } + + private static Object toProtoValue( + FieldDescriptor fieldDescriptor, Schema avroSchema, Object value) { + switch (avroSchema.getType()) { + case RECORD: + return messageFromGenericRecord(fieldDescriptor.getMessageType(), (GenericRecord) value); + case ARRAY: + Iterable iterable = (Iterable) value; + @Nullable Schema arrayElementType = avroSchema.getElementType(); + if (arrayElementType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + return StreamSupport.stream(iterable.spliterator(), false) + .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v)) + .collect(Collectors.toList()); + case UNION: + TypeWithNullability type = TypeWithNullability.create(avroSchema); + Preconditions.checkState( + type.getType().getType() != Schema.Type.UNION, + "Multiple non-null union types are not supported."); + return toProtoValue(fieldDescriptor, type.getType(), value); + case MAP: + Map map = (Map) value; + Schema valueType = TypeWithNullability.create(avroSchema.getElementType()).getType(); + if (valueType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + + return map.entrySet().stream() + .map( + (Map.Entry entry) -> + mapEntryToProtoValue(fieldDescriptor.getMessageType(), valueType, entry)) + .collect(Collectors.toList()); + default: + return scalarToProtoValue(avroSchema, value); + } + } + + static Object mapEntryToProtoValue( + Descriptor descriptor, Schema valueFieldType, Map.Entry entryValue) { + + DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); + FieldDescriptor keyFieldDescriptor = + Preconditions.checkNotNull(descriptor.findFieldByName("key")); + @Nullable + Object key = + toProtoValue(keyFieldDescriptor, Schema.create(Schema.Type.STRING), entryValue.getKey()); + if (key != null) { + builder.setField(keyFieldDescriptor, key); + } + FieldDescriptor valueFieldDescriptor = + Preconditions.checkNotNull(descriptor.findFieldByName("value")); + @Nullable + Object value = toProtoValue(valueFieldDescriptor, valueFieldType, entryValue.getValue()); + if (value != null) { + builder.setField(valueFieldDescriptor, value); + } + return builder.build(); + } + + @VisibleForTesting + static Object scalarToProtoValue(Schema fieldSchema, Object value) { + TypeWithNullability type = TypeWithNullability.create(fieldSchema); + LogicalType logicalType = LogicalTypes.fromSchema(type.getType()); + if (logicalType != null) { + @Nullable + BiFunction logicalTypeEncoder = + LOGICAL_TYPE_ENCODERS.get(logicalType.getName()); + if (logicalTypeEncoder == null) { + throw new IllegalArgumentException("Unsupported logical type " + logicalType.getName()); + } + return logicalTypeEncoder.apply(logicalType, value); + } else { + @Nullable Function encoder = PRIMITIVE_ENCODERS.get(type.getType().getType()); + if (encoder == null) { + throw new RuntimeException("Unexpected beam type " + fieldSchema); + } + return encoder.apply(value); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 5e032d96962a..6f4e838d7780 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -540,7 +540,15 @@ public class BigQueryIO { * A formatting function that maps a TableRow to itself. This allows sending a {@code * PCollection} directly to BigQueryIO.Write. */ - static final SerializableFunction IDENTITY_FORMATTER = input -> input; + static final SerializableFunction TABLE_ROW_IDENTITY_FORMATTER = + SerializableFunctions.identity();; + + /** + * A formatting function that maps a GenericRecord to itself. This allows sending a {@code + * PCollection} directly to BigQueryIO.Write. + */ + static final SerializableFunction, GenericRecord> + GENERIC_RECORD_IDENTITY_FORMATTER = AvroWriteRequest::getElement; static final SerializableFunction> GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>(); @@ -1925,7 +1933,16 @@ public static Write write() { * Write#withFormatFunction(SerializableFunction)}. */ public static Write writeTableRows() { - return BigQueryIO.write().withFormatFunction(IDENTITY_FORMATTER); + return BigQueryIO.write().withFormatFunction(TABLE_ROW_IDENTITY_FORMATTER); + } + + /** + * A {@link PTransform} that writes a {@link PCollection} containing {@link GenericRecord + * GenericRecords} to a BigQuery table. + */ + public static Write writeGenericRecords() { + return BigQueryIO.write() + .withAvroFormatFunction(GENERIC_RECORD_IDENTITY_FORMATTER); } /** Implementation of {@link #write}. */ @@ -2887,15 +2904,6 @@ public WriteResult expand(PCollection input) { !getUseBeamSchema(), "Auto schema update not supported when using Beam schemas."); } - if (method != Write.Method.FILE_LOADS) { - // we only support writing avro for FILE_LOADS - checkArgument( - getAvroRowWriterFactory() == null, - "Writing avro formatted data is only supported for FILE_LOADS, however " - + "the method was %s", - method); - } - if (input.isBounded() == IsBounded.BOUNDED) { checkArgument(!getAutoSharding(), "Auto-sharding is only applicable to unbounded input."); } @@ -3172,6 +3180,26 @@ private WriteResult continueExpandTyped( storageApiDynamicDestinations = new StorageApiDynamicDestinationsBeamRow<>( dynamicDestinations, elementSchema, elementToRowFunction); + } else if (getAvroRowWriterFactory() != null) { + // we can configure the avro to storage write api proto converter for this + // assuming the format function returns an Avro GenericRecord + // and there is a schema defined + checkArgument( + getJsonSchema() != null + || getDynamicDestinations() != null + || getSchemaFromView() != null, + "A schema must be provided for avro rows to be used with StorageWrite API."); + + RowWriterFactory.AvroRowWriterFactory + recordWriterFactory = + (RowWriterFactory.AvroRowWriterFactory) + rowWriterFactory; + SerializableFunction<@Nullable TableSchema, org.apache.avro.Schema> avroSchemaFactory = + Optional.ofNullable(getAvroSchemaFactory()).orElse(DEFAULT_AVRO_SCHEMA_FACTORY); + + storageApiDynamicDestinations = + new StorageApiDynamicDestinationsGenericRecord<>( + dynamicDestinations, avroSchemaFactory, recordWriterFactory.getToAvroFn()); } else { RowWriterFactory.TableRowWriterFactory tableRowWriterFactory = (RowWriterFactory.TableRowWriterFactory) rowWriterFactory; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java index 710e9d77c8af..21bf9ae74adf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java @@ -119,6 +119,10 @@ AvroRowWriterFactory prepare( return new AvroRowWriterFactory<>(toAvro, writerFactory, schemaFactory, dynamicDestinations); } + SerializableFunction, AvroT> getToAvroFn() { + return toAvro; + } + @Override OutputType getOutputType() { return OutputType.AvroGenericRecord; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java new file mode 100644 index 000000000000..020ea70df539 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Message; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Storage API DynamicDestinations used when the input is a Beam Row. */ +class StorageApiDynamicDestinationsGenericRecord + extends StorageApiDynamicDestinations { + + private final SerializableFunction, GenericRecord> toGenericRecord; + private final SerializableFunction<@Nullable TableSchema, Schema> schemaFactory; + + StorageApiDynamicDestinationsGenericRecord( + DynamicDestinations inner, + SerializableFunction<@Nullable TableSchema, Schema> schemaFactory, + SerializableFunction, GenericRecord> toGenericRecord) { + super(inner); + this.toGenericRecord = toGenericRecord; + this.schemaFactory = schemaFactory; + } + + @Override + public MessageConverter getMessageConverter( + DestinationT destination, DatasetService datasetService) throws Exception { + return new GenericRecordConverter(destination); + } + + class GenericRecordConverter implements MessageConverter { + + final com.google.cloud.bigquery.storage.v1.TableSchema protoTableSchema; + final Schema avroSchema; + final TableSchema bqTableSchema; + final Descriptor descriptor; + + GenericRecordConverter(DestinationT destination) throws Exception { + avroSchema = schemaFactory.apply(getSchema(destination)); + bqTableSchema = BigQueryUtils.toTableSchema(AvroUtils.toBeamSchema(avroSchema)); + protoTableSchema = + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(avroSchema); + descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(protoTableSchema, true); + } + + @Override + @SuppressWarnings("nullness") + public StorageApiWritePayload toMessage(T element) { + Message msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord( + descriptor, toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema))); + return new AutoValue_StorageApiWritePayload(msg.toByteArray(), null); + } + + @Override + public TableRow toTableRow(T element) { + return BigQueryUtils.convertGenericRecordToTableRow( + toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), bqTableSchema); + } + + @Override + public com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema() { + return protoTableSchema; + } + + @Override + public StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired) + throws Exception { + throw new RuntimeException("Not supported"); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java new file mode 100644 index 000000000000..94c58e414251 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.ByteString; +import com.google.protobuf.DescriptorProtos.DescriptorProto; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.EnumSet; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Days; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +/** Unit tests form {@link AvroGenericRecordToStorageApiProto}. */ +public class AvroGenericRecordToStorageApiProtoTest { + + enum TestEnum { + ONE, + TWO, + RED, + BLUE + } + + private static final String[] TEST_ENUM_STRS = + EnumSet.allOf(TestEnum.class).stream().map(TestEnum::name).toArray(String[]::new); + + private static final Schema BASE_SCHEMA = + SchemaBuilder.record("TestRecord") + .fields() + .optionalBytes("bytesValue") + .requiredInt("intValue") + .optionalLong("longValue") + .optionalFloat("floatValue") + .optionalDouble("doubleValue") + .optionalString("stringValue") + .optionalBoolean("booleanValue") + .name("arrayValue") + .type() + .array() + .items() + .stringType() + .noDefault() + .name("enumValue") + .type() + .optional() + .enumeration("testEnum") + .symbols(TEST_ENUM_STRS) + .name("fixedValue") + .type() + .fixed("MD5") + .size(16) + .noDefault() + .endRecord(); + + private static final Schema LOGICAL_TYPES_SCHEMA = + SchemaBuilder.record("LogicalTypesRecord") + .fields() + .name("decimalValue") + .type(LogicalTypes.decimal(1, 1).addToSchema(Schema.create(Schema.Type.BYTES))) + .noDefault() + .name("dateValue") + .type(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))) + .noDefault() + .name("timestampMicrosValue") + .type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestampMillisValue") + .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestampMicrosAsInstantValue") + .type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestampMillisAsInstantValue") + .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("uuidValue") + .type(LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))) + .noDefault() + .endRecord(); + + private static final DescriptorProto BASE_SCHEMA_PROTO = + DescriptorProto.newBuilder() + .addField( + FieldDescriptorProto.newBuilder() + .setName("bytesvalue") + .setNumber(1) + .setType(Type.TYPE_BYTES) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("intvalue") + .setNumber(2) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("longvalue") + .setNumber(3) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("floatvalue") + .setNumber(4) + .setType(Type.TYPE_DOUBLE) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("doublevalue") + .setNumber(5) + .setType(Type.TYPE_DOUBLE) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("stringvalue") + .setNumber(6) + .setType(Type.TYPE_STRING) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("booleanvalue") + .setNumber(7) + .setType(Type.TYPE_BOOL) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("arrayvalue") + .setNumber(8) + .setType(Type.TYPE_STRING) + .setLabel(Label.LABEL_REPEATED) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("enumvalue") + .setNumber(9) + .setType(Type.TYPE_STRING) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("fixedvalue") + .setNumber(10) + .setType(Type.TYPE_BYTES) + .setLabel(Label.LABEL_REQUIRED) + .build()) + .build(); + + private static final DescriptorProto LOGICAL_TYPES_SCHEMA_PROTO = + DescriptorProto.newBuilder() + .addField( + FieldDescriptorProto.newBuilder() + .setName("decimalvalue") + .setNumber(1) + .setType(Type.TYPE_BYTES) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("datevalue") + .setNumber(2) + .setType(Type.TYPE_INT32) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampmicrosvalue") + .setNumber(3) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampmillisvalue") + .setNumber(4) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampmicrosasinstantvalue") + .setNumber(5) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampmillisasinstantvalue") + .setNumber(6) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("uuidvalue") + .setNumber(7) + .setType(Type.TYPE_STRING) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .build(); + + private static final byte[] BYTES = "BYTE BYTE BYTE".getBytes(StandardCharsets.UTF_8); + + private static final Schema NESTED_SCHEMA = + SchemaBuilder.record("TestNestedRecord") + .fields() + .name("nested") + .type() + .optional() + .type(BASE_SCHEMA) + .name("nestedArray") + .type() + .array() + .items(BASE_SCHEMA) + .noDefault() + .endRecord(); + + private static GenericRecord baseRecord; + private static GenericRecord logicalTypesRecord; + private static Map baseProtoExpectedFields; + private static Map logicalTypesProtoExpectedFields; + private static GenericRecord nestedRecord; + + static { + try { + byte[] md5 = MessageDigest.getInstance("MD5").digest(BYTES); + Instant now = Instant.now(); + baseRecord = + new GenericRecordBuilder(BASE_SCHEMA) + .set("bytesValue", BYTES) + .set("intValue", (int) 3) + .set("longValue", (long) 4) + .set("floatValue", (float) 3.14) + .set("doubleValue", (double) 2.68) + .set("stringValue", "I am a string. Hear me roar.") + .set("booleanValue", true) + .set("arrayValue", ImmutableList.of("one", "two", "red", "blue")) + .set( + "enumValue", + new GenericData.EnumSymbol( + BASE_SCHEMA.getField("enumValue").schema(), TestEnum.TWO)) + .set( + "fixedValue", + new GenericData.Fixed(BASE_SCHEMA.getField("fixedValue").schema(), md5)) + .build(); + BigDecimal bd = BigDecimal.valueOf(1.1D); + UUID uuid = UUID.randomUUID(); + logicalTypesRecord = + new GenericRecordBuilder(LOGICAL_TYPES_SCHEMA) + .set( + "decimalValue", + new Conversions.DecimalConversion() + .toBytes(bd, Schema.create(Schema.Type.NULL), LogicalTypes.decimal(1, 1))) + .set("dateValue", now) + .set("timestampMicrosValue", now.getMillis() * 1000) + .set("timestampMicrosAsInstantValue", now) + .set("timestampMillisValue", now.getMillis()) + .set("timestampMillisAsInstantValue", now) + .set("uuidValue", uuid) + .build(); + baseProtoExpectedFields = + ImmutableMap.builder() + .put("bytesvalue", ByteString.copyFrom(BYTES)) + .put("intvalue", (long) 3) + .put("longvalue", (long) 4) + .put("floatvalue", (double) 3.14) + .put("doublevalue", (double) 2.68) + .put("stringvalue", "I am a string. Hear me roar.") + .put("booleanvalue", true) + .put("arrayvalue", ImmutableList.of("one", "two", "red", "blue")) + .put("enumvalue", TEST_ENUM_STRS[1]) + .put("fixedvalue", ByteString.copyFrom(md5)) + .build(); + logicalTypesProtoExpectedFields = + ImmutableMap.builder() + .put("decimalvalue", BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bd)) + .put( + "datevalue", + Days.daysBetween(Instant.EPOCH.toDateTime(), now.toDateTime()).getDays()) + .put("timestampmicrosvalue", now.getMillis() * 1000) + .put("timestampmicrosasinstantvalue", now.getMillis() * 1000) + .put("timestampmillisvalue", now.getMillis()) + .put("timestampmillisasinstantvalue", now.getMillis()) + .put("uuidvalue", uuid.toString()) + .build(); + nestedRecord = + new GenericRecordBuilder(NESTED_SCHEMA) + .set("nested", baseRecord) + .set("nestedArray", ImmutableList.of(baseRecord, baseRecord)) + .build(); + } catch (NoSuchAlgorithmException ex) { + throw new RuntimeException("Error initializing test data", ex); + } + } + + void validateDescriptorAgainstSchema(Schema originalSchema, DescriptorProto schemaProto) { + DescriptorProto descriptor = + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(originalSchema), + true); + Map types = + descriptor.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + Map expectedTypes = + schemaProto.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + assertEquals(expectedTypes, types); + + Map nameMapping = + originalSchema.getFields().stream() + .collect(Collectors.toMap(f -> f.name().toLowerCase(), f -> f.name())); + descriptor + .getFieldList() + .forEach( + p -> { + TypeWithNullability fieldSchema = + TypeWithNullability.create( + originalSchema.getField(nameMapping.get(p.getName())).schema()); + Label label = + fieldSchema.getType().getType() == Schema.Type.ARRAY + ? Label.LABEL_REPEATED + : fieldSchema.isNullable() ? Label.LABEL_OPTIONAL : Label.LABEL_REQUIRED; + assertEquals(label, p.getLabel()); + }); + } + + @Test + public void testDescriptorFromSchema() { + validateDescriptorAgainstSchema(BASE_SCHEMA, BASE_SCHEMA_PROTO); + } + + @Test + public void testDescriptorFromSchemaLogicalTypes() { + validateDescriptorAgainstSchema(LOGICAL_TYPES_SCHEMA, LOGICAL_TYPES_SCHEMA_PROTO); + } + + @Test + public void testNestedFromSchema() { + DescriptorProto descriptor = + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(NESTED_SCHEMA), true); + Map expectedBaseTypes = + BASE_SCHEMA_PROTO.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + + Map types = + descriptor.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + Map typeNames = + descriptor.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getTypeName)); + Map typeLabels = + descriptor.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getLabel)); + + assertEquals(2, types.size()); + + Map nestedTypes = + descriptor.getNestedTypeList().stream() + .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity())); + assertEquals(2, nestedTypes.size()); + assertEquals(Type.TYPE_MESSAGE, types.get("nested")); + assertEquals(Label.LABEL_OPTIONAL, typeLabels.get("nested")); + String nestedTypeName1 = typeNames.get("nested"); + Map nestedTypes1 = + nestedTypes.get(nestedTypeName1).getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + assertEquals(expectedBaseTypes, nestedTypes1); + + assertEquals(Type.TYPE_MESSAGE, types.get("nestedarray")); + assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedarray")); + String nestedTypeName2 = typeNames.get("nestedarray"); + Map nestedTypes2 = + nestedTypes.get(nestedTypeName2).getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + assertEquals(expectedBaseTypes, nestedTypes2); + } + + private void assertBaseRecord(DynamicMessage msg, Map expectedFields) { + Map recordFields = + msg.getAllFields().entrySet().stream() + .collect( + Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue())); + assertEquals(expectedFields, recordFields); + } + + @Test + public void testMessageFromGenericRecord() throws Exception { + Descriptors.Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(NESTED_SCHEMA), true); + DynamicMessage msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, nestedRecord); + + assertEquals(2, msg.getAllFields().size()); + + Map fieldDescriptors = + descriptor.getFields().stream() + .collect(Collectors.toMap(Descriptors.FieldDescriptor::getName, Functions.identity())); + DynamicMessage nestedMsg = (DynamicMessage) msg.getField(fieldDescriptors.get("nested")); + assertBaseRecord(nestedMsg, baseProtoExpectedFields); + } + + @Test + public void testMessageFromGenericRecordLogicalTypes() throws Exception { + Descriptors.Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(LOGICAL_TYPES_SCHEMA), + true); + DynamicMessage msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, logicalTypesRecord); + assertEquals(7, msg.getAllFields().size()); + assertBaseRecord(msg, logicalTypesProtoExpectedFields); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 7f55385dd034..69c57d25949a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -1138,55 +1138,90 @@ public static InputRecord create( private static final Coder INPUT_RECORD_CODER = SerializableCoder.of(InputRecord.class); - @Test - public void testWriteAvro() throws Exception { - assumeTrue(!useStorageApi); - assumeTrue(!useStreaming); + public void runTestWriteAvro(boolean schemaFromView) throws Exception { + String tableName = "project-id:dataset-id.table-id"; + BigQueryIO.Write bqWrite = + BigQueryIO.write() + .to(tableName) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withTestServices(fakeBqServices) + .withAvroFormatFunction( + r -> { + GenericRecord rec = new GenericData.Record(r.getSchema()); + InputRecord i = r.getElement(); + rec.put("strval", i.strVal()); + rec.put("longval", i.longVal()); + rec.put("doubleval", i.doubleVal()); + rec.put("instantval", i.instantVal().getMillis() * 1000); + return rec; + }) + .withoutValidation(); + TableSchema tableSchema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("strval").setType("STRING"), + new TableFieldSchema().setName("longval").setType("INTEGER"), + new TableFieldSchema().setName("doubleval").setType("FLOAT"), + new TableFieldSchema().setName("instantval").setType("TIMESTAMP"))); + if (schemaFromView) { + bqWrite = + bqWrite.withSchemaFromView( + p.apply( + "CreateTableSchemaString", + Create.of(KV.of(tableName, BigQueryHelpers.toJsonString(tableSchema)))) + .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) + .apply(View.asMap())); + } else { + bqWrite = bqWrite.withSchema(tableSchema); + } p.apply( Create.of( InputRecord.create("test", 1, 1.0, Instant.parse("2019-01-01T00:00:00Z")), InputRecord.create("test2", 2, 2.0, Instant.parse("2019-02-01T00:00:00Z"))) .withCoder(INPUT_RECORD_CODER)) - .apply( - BigQueryIO.write() - .to("dataset-id.table-id") - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withSchema( - new TableSchema() - .setFields( - ImmutableList.of( - new TableFieldSchema().setName("strVal").setType("STRING"), - new TableFieldSchema().setName("longVal").setType("INTEGER"), - new TableFieldSchema().setName("doubleVal").setType("FLOAT"), - new TableFieldSchema().setName("instantVal").setType("TIMESTAMP")))) - .withTestServices(fakeBqServices) - .withAvroFormatFunction( - r -> { - GenericRecord rec = new GenericData.Record(r.getSchema()); - InputRecord i = r.getElement(); - rec.put("strVal", i.strVal()); - rec.put("longVal", i.longVal()); - rec.put("doubleVal", i.doubleVal()); - rec.put("instantVal", i.instantVal().getMillis() * 1000); - return rec; - }) - .withoutValidation()); + .apply(bqWrite); + p.run(); assertThat( fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"), containsInAnyOrder( new TableRow() - .set("strVal", "test") - .set("longVal", "1") - .set("doubleVal", 1.0D) - .set("instantVal", "2019-01-01 00:00:00 UTC"), + .set("strval", "test") + .set("longval", "1") + .set("doubleval", 1.0) + .set( + "instantval", + useStorageApi || useStorageApiApproximate + ? String.valueOf(Instant.parse("2019-01-01T00:00:00Z").getMillis() * 1000) + : "2019-01-01 00:00:00 UTC"), new TableRow() - .set("strVal", "test2") - .set("longVal", "2") - .set("doubleVal", 2.0D) - .set("instantVal", "2019-02-01 00:00:00 UTC"))); + .set("strval", "test2") + .set("longval", "2") + .set("doubleval", 2.0D) + .set( + "instantval", + useStorageApi || useStorageApiApproximate + ? String.valueOf(Instant.parse("2019-02-01T00:00:00Z").getMillis() * 1000) + : "2019-02-01 00:00:00 UTC"))); + } + + @Test + public void testWriteAvro() throws Exception { + // only streaming inserts don't support avro types + assumeTrue(!useStreaming); + + runTestWriteAvro(false); + } + + @Test + public void testWriteAvroWithSchemaFromView() throws Exception { + // only streaming inserts don't support avro types + assumeTrue(useStorageApi); + + runTestWriteAvro(true); } @Test @@ -2156,7 +2191,7 @@ public void testWriteValidateFailsWithAvroFormatAndStreamingInserts() { p.enableAbandonedNodeEnforcement(false); thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Writing avro formatted data is only supported for FILE_LOADS"); + thrown.expectMessage("Avro output is not supported when method == STREAMING_INSERTS"); p.apply(Create.empty(INPUT_RECORD_CODER)) .apply( BigQueryIO.write()