From 685fc06c5fe242dac29556964a5eb86e0a02ba2d Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Fri, 18 Nov 2022 20:34:34 -0300 Subject: [PATCH 1/8] initial impl --- .../beam/sdk/schemas/utils/AvroUtils.java | 18 +- .../AvroGenericRecordToStorageApiProto.java | 420 ++++++++++++++++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 29 +- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 2 + .../sdk/io/gcp/bigquery/RowWriterFactory.java | 4 + ...geApiDynamicDestinationsGenericRecord.java | 87 ++++ ...vroGenericRecordToStorageApiProtoTest.java | 455 ++++++++++++++++++ .../io/gcp/bigquery/BigQueryIOWriteTest.java | 47 +- 8 files changed, 1031 insertions(+), 31 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index e61dbe505e22..b06e73739b10 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -151,9 +151,13 @@ public class AvroUtils { } // Unwrap an AVRO schema into the base type an whether it is nullable. - static class TypeWithNullability { - public final org.apache.avro.Schema type; - public final boolean nullable; + public static class TypeWithNullability { + final org.apache.avro.Schema type; + final boolean nullable; + + public static TypeWithNullability create(org.apache.avro.Schema avroSchema) { + return new TypeWithNullability(avroSchema); + } TypeWithNullability(org.apache.avro.Schema avroSchema) { if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) { @@ -185,6 +189,14 @@ static class TypeWithNullability { nullable = false; } } + + public Boolean isNullable() { + return nullable; + } + + public org.apache.avro.Schema getType() { + return type; + } } /** Wrapper for fixed byte fields. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java new file mode 100644 index 000000000000..c45068738598 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.protobuf.ByteString; +import com.google.protobuf.DescriptorProtos.DescriptorProto; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type; +import com.google.protobuf.DescriptorProtos.FileDescriptorProto; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Descriptors.FileDescriptor; +import com.google.protobuf.DynamicMessage; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.joda.time.Days; +import org.joda.time.Instant; +import org.joda.time.ReadableInstant; + +/** + * Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message, + * for use with the Storage write API. + */ +public class AvroGenericRecordToStorageApiProto { + + static final Map PRIMITIVE_TYPES = + ImmutableMap.builder() + .put(Schema.Type.INT, FieldDescriptorProto.Type.TYPE_INT32) + .put(Schema.Type.FIXED, FieldDescriptorProto.Type.TYPE_BYTES) + .put(Schema.Type.LONG, FieldDescriptorProto.Type.TYPE_INT64) + .put(Schema.Type.FLOAT, FieldDescriptorProto.Type.TYPE_FLOAT) + .put(Schema.Type.DOUBLE, FieldDescriptorProto.Type.TYPE_DOUBLE) + .put(Schema.Type.STRING, FieldDescriptorProto.Type.TYPE_STRING) + .put(Schema.Type.BOOLEAN, FieldDescriptorProto.Type.TYPE_BOOL) + .put(Schema.Type.ENUM, FieldDescriptorProto.Type.TYPE_STRING) + .put(Schema.Type.BYTES, FieldDescriptorProto.Type.TYPE_BYTES) + .build(); + + // A map of supported logical types to the protobuf field type. + static final Map LOGICAL_TYPES = + ImmutableMap.builder() + .put(LogicalTypes.date().getName(), FieldDescriptorProto.Type.TYPE_INT32) + .put(LogicalTypes.decimal(1).getName(), FieldDescriptorProto.Type.TYPE_BYTES) + .put(LogicalTypes.timestampMicros().getName(), FieldDescriptorProto.Type.TYPE_INT64) + .put(LogicalTypes.timestampMillis().getName(), FieldDescriptorProto.Type.TYPE_INT64) + .put(LogicalTypes.uuid().getName(), FieldDescriptorProto.Type.TYPE_STRING) + .build(); + + static final Map> PRIMITIVE_ENCODERS = + ImmutableMap.>builder() + .put(Schema.Type.INT, Functions.identity()) + .put(Schema.Type.FIXED, o -> ByteString.copyFrom(((GenericData.Fixed) o).bytes())) + .put(Schema.Type.LONG, Functions.identity()) + .put(Schema.Type.FLOAT, Function.identity()) + .put(Schema.Type.DOUBLE, Function.identity()) + .put(Schema.Type.STRING, Function.identity()) + .put(Schema.Type.BOOLEAN, Function.identity()) + .put(Schema.Type.ENUM, o -> o.toString()) + .put(Schema.Type.BYTES, o -> ByteString.copyFrom((byte[]) o)) + .build(); + + // A map of supported logical types to their encoding functions. + static final Map> LOGICAL_TYPE_ENCODERS = + ImmutableMap.>builder() + .put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value)) + .put( + LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal) + .put( + LogicalTypes.timestampMicros().getName(), + (logicalType, value) -> convertTimestamp(value, true)) + .put( + LogicalTypes.timestampMillis().getName(), + (logicalType, value) -> convertTimestamp(value, false)) + .put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value)) + .build(); + + static String convertUUID(Object value) { + if (value instanceof UUID) { + return ((UUID) value).toString(); + } else { + Preconditions.checkArgument(value instanceof String, "Expecting a value as String type."); + UUID.fromString((String) value); + return (String) value; + } + } + + static Long convertTimestamp(Object value, boolean micros) { + if (value instanceof ReadableInstant) { + return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1); + } else { + Preconditions.checkArgument( + value instanceof Long, "Expecting a value as Long type (millis)."); + return (Long) value; + } + } + + static Integer convertDate(Object value) { + if (value instanceof ReadableInstant) { + return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays(); + } else { + Preconditions.checkArgument( + value instanceof Integer, "Expecting a value as Integer type (days)."); + return (Integer) value; + } + } + + static ByteString convertDecimal(LogicalType logicalType, Object value) { + ByteBuffer byteBuffer = (ByteBuffer) value; + BigDecimal bigDecimal = + new Conversions.DecimalConversion() + .fromBytes( + byteBuffer.duplicate(), + Schema.create(Schema.Type.NULL), // dummy schema, not used + logicalType); + return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal); + } + + /** + * Given an Avro Schema, returns a protocol-buffer Descriptor that can be used to write data using + * the BigQuery Storage API. + * + * @param schema An Avro Schema + * @return Returns the Descriptor created from the provided Schema + * @throws com.google.protobuf.Descriptors.DescriptorValidationException + */ + public static Descriptor getDescriptorFromSchema(Schema schema) + throws DescriptorValidationException { + DescriptorProto descriptorProto = descriptorSchemaFromAvroSchema(schema); + FileDescriptorProto fileDescriptorProto = + FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build(); + FileDescriptor fileDescriptor = + FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]); + + return Iterables.getOnlyElement(fileDescriptor.getMessageTypes()); + } + + /** + * Given an Avro {@link GenericRecord} object, returns a protocol-buffer message that can be used + * to write data using the BigQuery Storage streaming API. + * + * @param descriptor The Descriptor for the DynamicMessage result + * @param record An Avro GenericRecord + * @return A dynamic message representation of a Proto payload to be used for StorageWrite API + */ + public static DynamicMessage messageFromGenericRecord( + Descriptor descriptor, GenericRecord record) { + Schema schema = record.getSchema(); + DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); + for (Schema.Field field : schema.getFields()) { + FieldDescriptor fieldDescriptor = + Preconditions.checkNotNull(descriptor.findFieldByName(field.name().toLowerCase())); + @Nullable + Object value = + messageValueFromGenericRecordValue(fieldDescriptor, field, field.name(), record); + if (value != null) { + builder.setField(fieldDescriptor, value); + } + } + return builder.build(); + } + + @VisibleForTesting + static DescriptorProto descriptorSchemaFromAvroSchema(Schema schema) { + Preconditions.checkState(!schema.getFields().isEmpty()); + DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder(); + // Create a unique name for the descriptor ('-' characters cannot be used). + descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_")); + int i = 1; + List nestedTypes = Lists.newArrayList(); + for (Schema.Field field : schema.getFields()) { + FieldDescriptorProto.Builder fieldDescriptorProtoBuilder = + fieldDescriptorFromAvroField(field, i++, nestedTypes); + descriptorBuilder.addField(fieldDescriptorProtoBuilder); + } + nestedTypes.forEach(descriptorBuilder::addNestedType); + return descriptorBuilder.build(); + } + + static DescriptorProto mapDescriptorSchemaFromAvroSchema( + FieldDescriptorProto.Builder keyFieldDescriptor, + FieldDescriptorProto.Builder valueFieldDescriptor, + List nestedTypes) { + DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder(); + // Create a unique name for the descriptor ('-' characters cannot be used). + descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_")); + descriptorBuilder.addField(keyFieldDescriptor); + descriptorBuilder.addField(valueFieldDescriptor); + nestedTypes.forEach(descriptorBuilder::addNestedType); + return descriptorBuilder.build(); + } + + private static FieldDescriptorProto.Builder fieldDescriptorFromAvroField( + Schema.Field field, int fieldNumber, List nestedTypes) { + @Nullable Schema schema = field.schema(); + FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder(); + fieldDescriptorBuilder = fieldDescriptorBuilder.setName(field.name().toLowerCase()); + fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber); + Schema elementType = null; + switch (schema.getType()) { + case RECORD: + if (schema == null) { + throw new RuntimeException("Unexpected null schema!"); + } + DescriptorProto nested = descriptorSchemaFromAvroSchema(schema); + nestedTypes.add(nested); + fieldDescriptorBuilder = + fieldDescriptorBuilder + .setType(FieldDescriptorProto.Type.TYPE_MESSAGE) + .setTypeName(nested.getName()); + break; + case ARRAY: + elementType = TypeWithNullability.create(schema.getElementType()).getType(); + if (elementType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + Preconditions.checkState( + elementType.getType() != Schema.Type.ARRAY, "Nested arrays not supported by BigQuery."); + return fieldDescriptorFromAvroField( + new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal()), + fieldNumber, + nestedTypes) + .setLabel(Label.LABEL_REPEATED); + case MAP: + Schema keyType = Schema.create(Schema.Type.STRING); + Schema valueType = TypeWithNullability.create(schema.getElementType()).getType(); + if (valueType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + + List nestedTypesMap = Lists.newArrayList(); + + DescriptorProto nestedMap = + mapDescriptorSchemaFromAvroSchema( + fieldDescriptorFromAvroField( + new Schema.Field( + "key", keyType, "key of the map entry", Schema.Field.NULL_VALUE), + 1, + nestedTypesMap), + fieldDescriptorFromAvroField( + new Schema.Field( + "value", valueType, "value of the map entry", Schema.Field.NULL_VALUE), + 2, + nestedTypesMap), + nestedTypesMap); + + nestedTypes.add(nestedMap); + fieldDescriptorBuilder = + fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nestedMap.getName()); + + return fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED); + case UNION: + elementType = TypeWithNullability.create(schema).getType(); + if (elementType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + // check to see if more than one non-null type is defined in the union + Preconditions.checkState( + elementType.getType() != Schema.Type.UNION, + "Multiple non-null union types are not supported."); + fieldDescriptorBuilder = + fieldDescriptorFromAvroField( + new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal()), + fieldNumber, + nestedTypes); + break; + default: + elementType = TypeWithNullability.create(schema).getType(); + @Nullable + FieldDescriptorProto.Type primitiveType = + Optional.ofNullable(LogicalTypes.fromSchema(elementType)) + .map(logicalType -> LOGICAL_TYPES.get(logicalType.getName())) + .orElse(PRIMITIVE_TYPES.get(elementType.getType())); + if (primitiveType == null) { + throw new RuntimeException("Unsupported type " + elementType.getType()); + } + // a scalar will be required by default, if defined as part of union then + // caller will set nullability requirements + return fieldDescriptorBuilder.setType(primitiveType).setLabel(Label.LABEL_REQUIRED); + } + if (TypeWithNullability.create(schema).isNullable()) { + fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL); + } else { + fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED); + } + return fieldDescriptorBuilder; + } + + @Nullable + private static Object messageValueFromGenericRecordValue( + FieldDescriptor fieldDescriptor, Schema.Field avroField, String name, GenericRecord record) { + @Nullable Object value = record.get(name); + if (value == null) { + if (fieldDescriptor.isOptional()) { + return null; + } else { + throw new IllegalArgumentException( + "Received null value for non-nullable field " + fieldDescriptor.getName()); + } + } + return toProtoValue(fieldDescriptor, avroField.schema(), value); + } + + private static Object toProtoValue( + FieldDescriptor fieldDescriptor, Schema avroSchema, Object value) { + switch (avroSchema.getType()) { + case RECORD: + return messageFromGenericRecord(fieldDescriptor.getMessageType(), (GenericRecord) value); + case ARRAY: + List list = (List) value; + @Nullable Schema arrayElementType = avroSchema.getElementType(); + if (arrayElementType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + return list.stream() + .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v)) + .collect(Collectors.toList()); + case UNION: + TypeWithNullability type = TypeWithNullability.create(avroSchema); + Preconditions.checkState( + type.getType().getType() != Schema.Type.UNION, + "Multiple non-null union types are not supported."); + return toProtoValue(fieldDescriptor, type.getType(), value); + case MAP: + Map map = (Map) value; + Schema valueType = TypeWithNullability.create(avroSchema.getElementType()).getType(); + if (valueType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + + return map.entrySet().stream() + .map( + (Map.Entry entry) -> + mapEntryToProtoValue(fieldDescriptor.getMessageType(), valueType, entry)) + .collect(Collectors.toList()); + default: + return scalarToProtoValue(avroSchema, value); + } + } + + static Object mapEntryToProtoValue( + Descriptor descriptor, Schema valueFieldType, Map.Entry entryValue) { + + DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); + FieldDescriptor keyFieldDescriptor = + Preconditions.checkNotNull(descriptor.findFieldByName("key")); + @Nullable + Object key = + toProtoValue(keyFieldDescriptor, Schema.create(Schema.Type.STRING), entryValue.getKey()); + if (key != null) { + builder.setField(keyFieldDescriptor, key); + } + FieldDescriptor valueFieldDescriptor = + Preconditions.checkNotNull(descriptor.findFieldByName("value")); + @Nullable + Object value = toProtoValue(valueFieldDescriptor, valueFieldType, entryValue.getValue()); + if (value != null) { + builder.setField(valueFieldDescriptor, value); + } + return builder.build(); + } + + @VisibleForTesting + static Object scalarToProtoValue(Schema fieldSchema, Object value) { + TypeWithNullability type = TypeWithNullability.create(fieldSchema); + LogicalType logicalType = LogicalTypes.fromSchema(type.getType()); + if (logicalType != null) { + @Nullable + BiFunction logicalTypeEncoder = + LOGICAL_TYPE_ENCODERS.get(logicalType.getName()); + if (logicalTypeEncoder == null) { + throw new IllegalArgumentException("Unsupported logical type " + logicalType.getName()); + } + return logicalTypeEncoder.apply(logicalType, value); + } else { + @Nullable Function encoder = PRIMITIVE_ENCODERS.get(type.getType().getType()); + if (encoder == null) { + throw new RuntimeException("Unexpected beam type " + fieldSchema); + } + return encoder.apply(value); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index e72bc20f780e..8046442507b5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2880,15 +2880,6 @@ public WriteResult expand(PCollection input) { "withAutoSchemaUpdate only supported when using storage-api writes."); } - if (method != Write.Method.FILE_LOADS) { - // we only support writing avro for FILE_LOADS - checkArgument( - getAvroRowWriterFactory() == null, - "Writing avro formatted data is only supported for FILE_LOADS, however " - + "the method was %s", - method); - } - if (input.isBounded() == IsBounded.BOUNDED) { checkArgument(!getAutoSharding(), "Auto-sharding is only applicable to unbounded input."); } @@ -3165,6 +3156,26 @@ private WriteResult continueExpandTyped( storageApiDynamicDestinations = new StorageApiDynamicDestinationsBeamRow<>( dynamicDestinations, elementSchema, elementToRowFunction); + } else if (getAvroRowWriterFactory() != null) { + // we can configure the avro to storage write api proto converter for this + // assuming the format function returns an Avro GenericRecord + // and there is a schema defined + checkArgument( + getJsonSchema() != null + || getDynamicDestinations() != null + || getSchemaFromView() != null, + "A schema must be provided for avro rows to be used with StorageWrite API."); + + RowWriterFactory.AvroRowWriterFactory + recordWriterFactory = + (RowWriterFactory.AvroRowWriterFactory) + rowWriterFactory; + SerializableFunction<@Nullable TableSchema, org.apache.avro.Schema> avroSchemaFactory = + Optional.ofNullable(getAvroSchemaFactory()).orElse(DEFAULT_AVRO_SCHEMA_FACTORY); + + storageApiDynamicDestinations = + new StorageApiDynamicDestinationsGenericRecord<>( + dynamicDestinations, avroSchemaFactory, recordWriterFactory.getToAvroFn()); } else { RowWriterFactory.TableRowWriterFactory tableRowWriterFactory = (RowWriterFactory.TableRowWriterFactory) rowWriterFactory; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 6a340496122b..76ff7da636fc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -64,6 +64,7 @@ import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -265,6 +266,7 @@ public abstract static class Builder { .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME) .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME) .put("Enum", StandardSQLTypeName.STRING) + .put(MicrosInstant.IDENTIFIER, StandardSQLTypeName.TIMESTAMP) .build(); private static final String BIGQUERY_MAP_KEY_FIELD_NAME = "key"; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java index 710e9d77c8af..21bf9ae74adf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java @@ -119,6 +119,10 @@ AvroRowWriterFactory prepare( return new AvroRowWriterFactory<>(toAvro, writerFactory, schemaFactory, dynamicDestinations); } + SerializableFunction, AvroT> getToAvroFn() { + return toAvro; + } + @Override OutputType getOutputType() { return OutputType.AvroGenericRecord; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java new file mode 100644 index 000000000000..2d5efa4f586a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Message; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Storage API DynamicDestinations used when the input is a Beam Row. */ +class StorageApiDynamicDestinationsGenericRecord + extends StorageApiDynamicDestinations { + + private final SerializableFunction, GenericRecord> toGenericRecord; + private final SerializableFunction<@Nullable TableSchema, Schema> schemaFactory; + + StorageApiDynamicDestinationsGenericRecord( + DynamicDestinations inner, + SerializableFunction<@Nullable TableSchema, Schema> schemaFactory, + SerializableFunction, GenericRecord> toGenericRecord) { + super(inner); + this.toGenericRecord = toGenericRecord; + this.schemaFactory = schemaFactory; + } + + @Override + public MessageConverter getMessageConverter( + DestinationT destination, DatasetService datasetService) throws Exception { + return new MessageConverter() { + final Descriptor descriptor; + final long descriptorHash; + final Schema avroSchema; + final TableSchema tableSchema; + + { + avroSchema = schemaFactory.apply(getSchema(destination)); + tableSchema = BigQueryUtils.toTableSchema(AvroUtils.toBeamSchema(avroSchema)); + descriptor = AvroGenericRecordToStorageApiProto.getDescriptorFromSchema(avroSchema); + descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor); + } + + @Override + public DescriptorWrapper getSchemaDescriptor() { + return new DescriptorWrapper(descriptor, descriptorHash); + } + + @Override + public void refreshSchema(long expectedHash) {} + + @Override + public StorageApiWritePayload toMessage(T element) { + Message msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord( + descriptor, toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema))); + return new AutoValue_StorageApiWritePayload(msg.toByteArray(), descriptorHash); + } + + @Override + public TableRow toTableRow(T element) { + return BigQueryUtils.convertGenericRecordToTableRow( + toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), tableSchema); + } + }; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java new file mode 100644 index 000000000000..b1fa63776e4a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.ByteString; +import com.google.protobuf.DescriptorProtos.DescriptorProto; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.EnumSet; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Days; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +/** Unit tests form {@link AvroGenericRecordToStorageApiProto}. */ +public class AvroGenericRecordToStorageApiProtoTest { + + enum TestEnum { + ONE, + TWO, + RED, + BLUE + } + + private static final String[] TEST_ENUM_STRS = + EnumSet.allOf(TestEnum.class).stream().map(TestEnum::name).toArray(String[]::new); + + private static final Schema BASE_SCHEMA = + SchemaBuilder.record("TestRecord") + .fields() + .optionalBytes("bytesValue") + .requiredInt("int32Value") + .optionalLong("int64Value") + .optionalFloat("floatValue") + .optionalDouble("doubleValue") + .optionalString("stringValue") + .optionalBoolean("booleanValue") + .name("arrayValue") + .type() + .array() + .items() + .stringType() + .noDefault() + .name("enumValue") + .type() + .optional() + .enumeration("testEnum") + .symbols(TEST_ENUM_STRS) + .name("fixedValue") + .type() + .fixed("MD5") + .size(16) + .noDefault() + .endRecord(); + + private static final Schema LOGICAL_TYPES_SCHEMA = + SchemaBuilder.record("LogicalTypesRecord") + .fields() + .name("decimalValue") + .type(LogicalTypes.decimal(1, 1).addToSchema(Schema.create(Schema.Type.BYTES))) + .noDefault() + .name("dateValue") + .type(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))) + .noDefault() + .name("timestampMicrosValue") + .type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestampMillisValue") + .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestampMicrosAsInstantValue") + .type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("uuidValue") + .type(LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))) + .noDefault() + .endRecord(); + + private static final DescriptorProto BASE_SCHEMA_PROTO = + DescriptorProto.newBuilder() + .addField( + FieldDescriptorProto.newBuilder() + .setName("bytesvalue") + .setNumber(1) + .setType(Type.TYPE_BYTES) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("int32value") + .setNumber(2) + .setType(Type.TYPE_INT32) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("int64value") + .setNumber(3) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("floatvalue") + .setNumber(4) + .setType(Type.TYPE_FLOAT) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("doublevalue") + .setNumber(5) + .setType(Type.TYPE_DOUBLE) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("stringvalue") + .setNumber(6) + .setType(Type.TYPE_STRING) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("booleanvalue") + .setNumber(7) + .setType(Type.TYPE_BOOL) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("arrayvalue") + .setNumber(8) + .setType(Type.TYPE_STRING) + .setLabel(Label.LABEL_REPEATED) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("enumvalue") + .setNumber(9) + .setType(Type.TYPE_STRING) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("fixedvalue") + .setNumber(10) + .setType(Type.TYPE_BYTES) + .setLabel(Label.LABEL_REQUIRED) + .build()) + .build(); + + private static final DescriptorProto LOGICAL_TYPES_SCHEMA_PROTO = + DescriptorProto.newBuilder() + .addField( + FieldDescriptorProto.newBuilder() + .setName("decimalvalue") + .setNumber(1) + .setType(Type.TYPE_BYTES) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("datevalue") + .setNumber(2) + .setType(Type.TYPE_INT32) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampmicrosvalue") + .setNumber(3) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampmillisvalue") + .setNumber(4) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampmicrosasinstantvalue") + .setNumber(5) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("uuidvalue") + .setNumber(6) + .setType(Type.TYPE_STRING) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .build(); + + private static final byte[] BYTES = "BYTE BYTE BYTE".getBytes(StandardCharsets.UTF_8); + + private static final Schema NESTED_SCHEMA = + SchemaBuilder.record("TestNestedRecord") + .fields() + .name("nested") + .type() + .optional() + .type(BASE_SCHEMA) + .name("nestedArray") + .type() + .array() + .items(BASE_SCHEMA) + .noDefault() + .endRecord(); + + private static GenericRecord baseRecord; + private static GenericRecord logicalTypesRecord; + private static Map baseProtoExpectedFields; + private static Map logicalTypesProtoExpectedFields; + private static GenericRecord nestedRecord; + + static { + try { + byte[] md5 = MessageDigest.getInstance("MD5").digest(BYTES); + Instant now = Instant.now(); + baseRecord = + new GenericRecordBuilder(BASE_SCHEMA) + .set("bytesValue", BYTES) + .set("int32Value", (int) 3) + .set("int64Value", (long) 4) + .set("floatValue", (float) 3.14) + .set("doubleValue", (double) 2.68) + .set("stringValue", "I am a string. Hear me roar.") + .set("booleanValue", true) + .set("arrayValue", ImmutableList.of("one", "two", "red", "blue")) + .set( + "enumValue", + new GenericData.EnumSymbol( + BASE_SCHEMA.getField("enumValue").schema(), TestEnum.TWO)) + .set( + "fixedValue", + new GenericData.Fixed(BASE_SCHEMA.getField("fixedValue").schema(), md5)) + .build(); + BigDecimal bd = BigDecimal.valueOf(1.1D); + UUID uuid = UUID.randomUUID(); + logicalTypesRecord = + new GenericRecordBuilder(LOGICAL_TYPES_SCHEMA) + .set( + "decimalValue", + new Conversions.DecimalConversion() + .toBytes(bd, Schema.create(Schema.Type.NULL), LogicalTypes.decimal(1, 1))) + .set("dateValue", now) + .set("timestampMicrosValue", now.getMillis() * 1000) + .set("timestampMicrosAsInstantValue", now) + .set("timestampMillisValue", now.toDateTime()) + .set("uuidValue", uuid) + .build(); + baseProtoExpectedFields = + ImmutableMap.builder() + .put("bytesvalue", ByteString.copyFrom(BYTES)) + .put("int32value", (int) 3) + .put("int64value", (long) 4) + .put("floatvalue", (float) 3.14) + .put("doublevalue", (double) 2.68) + .put("stringvalue", "I am a string. Hear me roar.") + .put("booleanvalue", true) + .put("arrayvalue", ImmutableList.of("one", "two", "red", "blue")) + .put("enumvalue", TEST_ENUM_STRS[1]) + .put("fixedvalue", ByteString.copyFrom(md5)) + .build(); + logicalTypesProtoExpectedFields = + ImmutableMap.builder() + .put("decimalvalue", BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bd)) + .put( + "datevalue", + Days.daysBetween(Instant.EPOCH.toDateTime(), now.toDateTime()).getDays()) + .put("timestampmicrosvalue", now.getMillis() * 1000) + .put("timestampmicrosasinstantvalue", now.getMillis() * 1000) + .put("timestampmillisvalue", now.getMillis()) + .put("uuidvalue", uuid.toString()) + .build(); + nestedRecord = + new GenericRecordBuilder(NESTED_SCHEMA) + .set("nested", baseRecord) + .set("nestedArray", ImmutableList.of(baseRecord, baseRecord)) + .build(); + } catch (NoSuchAlgorithmException ex) { + throw new RuntimeException("Error initializing test data", ex); + } + } + + void validateDescriptorAgainstSchema(Schema originalSchema, DescriptorProto schemaProto) { + DescriptorProto descriptor = + AvroGenericRecordToStorageApiProto.descriptorSchemaFromAvroSchema(originalSchema); + Map types = + descriptor.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + Map expectedTypes = + schemaProto.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + assertEquals(expectedTypes, types); + + Map nameMapping = + originalSchema.getFields().stream() + .collect(Collectors.toMap(f -> f.name().toLowerCase(), f -> f.name())); + descriptor + .getFieldList() + .forEach( + p -> { + TypeWithNullability fieldSchema = + TypeWithNullability.create( + originalSchema.getField(nameMapping.get(p.getName())).schema()); + Label label = + fieldSchema.getType().getType() == Schema.Type.ARRAY + ? Label.LABEL_REPEATED + : fieldSchema.isNullable() ? Label.LABEL_OPTIONAL : Label.LABEL_REQUIRED; + assertEquals(label, p.getLabel()); + }); + } + + @Test + public void testDescriptorFromSchema() { + validateDescriptorAgainstSchema(BASE_SCHEMA, BASE_SCHEMA_PROTO); + } + + @Test + public void testDescriptorFromSchemaLogicalTypes() { + validateDescriptorAgainstSchema(LOGICAL_TYPES_SCHEMA, LOGICAL_TYPES_SCHEMA_PROTO); + } + + @Test + public void testNestedFromSchema() { + DescriptorProto descriptor = + AvroGenericRecordToStorageApiProto.descriptorSchemaFromAvroSchema(NESTED_SCHEMA); + Map expectedBaseTypes = + BASE_SCHEMA_PROTO.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + + Map types = + descriptor.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + Map typeNames = + descriptor.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getTypeName)); + Map typeLabels = + descriptor.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getLabel)); + + assertEquals(2, types.size()); + + Map nestedTypes = + descriptor.getNestedTypeList().stream() + .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity())); + assertEquals(2, nestedTypes.size()); + assertEquals(Type.TYPE_MESSAGE, types.get("nested")); + assertEquals(Label.LABEL_OPTIONAL, typeLabels.get("nested")); + String nestedTypeName1 = typeNames.get("nested"); + Map nestedTypes1 = + nestedTypes.get(nestedTypeName1).getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + assertEquals(expectedBaseTypes, nestedTypes1); + + assertEquals(Type.TYPE_MESSAGE, types.get("nestedarray")); + assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedarray")); + String nestedTypeName2 = typeNames.get("nestedarray"); + Map nestedTypes2 = + nestedTypes.get(nestedTypeName2).getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + assertEquals(expectedBaseTypes, nestedTypes2); + } + + private void assertBaseRecord(DynamicMessage msg, Map expectedFields) { + Map recordFields = + msg.getAllFields().entrySet().stream() + .collect( + Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue())); + assertEquals(expectedFields, recordFields); + } + + @Test + public void testMessageFromGenericRecord() throws Exception { + Descriptors.Descriptor descriptor = + AvroGenericRecordToStorageApiProto.getDescriptorFromSchema(NESTED_SCHEMA); + DynamicMessage msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, nestedRecord); + + assertEquals(2, msg.getAllFields().size()); + + Map fieldDescriptors = + descriptor.getFields().stream() + .collect(Collectors.toMap(Descriptors.FieldDescriptor::getName, Functions.identity())); + DynamicMessage nestedMsg = (DynamicMessage) msg.getField(fieldDescriptors.get("nested")); + assertBaseRecord(nestedMsg, baseProtoExpectedFields); + } + + @Test + public void testMessageFromGenericRecordLogicalTypes() throws Exception { + Descriptors.Descriptor descriptor = + AvroGenericRecordToStorageApiProto.getDescriptorFromSchema(LOGICAL_TYPES_SCHEMA); + DynamicMessage msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, logicalTypesRecord); + assertEquals(6, msg.getAllFields().size()); + assertBaseRecord(msg, logicalTypesProtoExpectedFields); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 1e1749e8569a..f8c09ad395ce 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -1079,7 +1079,8 @@ public static InputRecord create( @Test public void testWriteAvro() throws Exception { - if (useStorageApi || useStreaming) { + if (useStreaming) { + // only streaming inserts does not support avro types return; } p.apply( @@ -1089,25 +1090,25 @@ public void testWriteAvro() throws Exception { .withCoder(INPUT_RECORD_CODER)) .apply( BigQueryIO.write() - .to("dataset-id.table-id") + .to("project-id:dataset-id.table-id") .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withSchema( new TableSchema() .setFields( ImmutableList.of( - new TableFieldSchema().setName("strVal").setType("STRING"), - new TableFieldSchema().setName("longVal").setType("INTEGER"), - new TableFieldSchema().setName("doubleVal").setType("FLOAT"), - new TableFieldSchema().setName("instantVal").setType("TIMESTAMP")))) + new TableFieldSchema().setName("strval").setType("STRING"), + new TableFieldSchema().setName("longval").setType("INTEGER"), + new TableFieldSchema().setName("doubleval").setType("FLOAT"), + new TableFieldSchema().setName("instantval").setType("TIMESTAMP")))) .withTestServices(fakeBqServices) .withAvroFormatFunction( r -> { GenericRecord rec = new GenericData.Record(r.getSchema()); InputRecord i = r.getElement(); - rec.put("strVal", i.strVal()); - rec.put("longVal", i.longVal()); - rec.put("doubleVal", i.doubleVal()); - rec.put("instantVal", i.instantVal().getMillis() * 1000); + rec.put("strval", i.strVal()); + rec.put("longval", i.longVal()); + rec.put("doubleval", i.doubleVal()); + rec.put("instantval", i.instantVal().getMillis() * 1000); return rec; }) .withoutValidation()); @@ -1117,15 +1118,23 @@ public void testWriteAvro() throws Exception { fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"), containsInAnyOrder( new TableRow() - .set("strVal", "test") - .set("longVal", "1") - .set("doubleVal", 1.0D) - .set("instantVal", "2019-01-01 00:00:00 UTC"), + .set("strval", "test") + .set("longval", "1") + .set("doubleval", 1.0) + .set( + "instantval", + useStorageApi || useStorageApiApproximate + ? String.valueOf(Instant.parse("2019-01-01T00:00:00Z").getMillis() * 1000) + : "2019-01-01 00:00:00 UTC"), new TableRow() - .set("strVal", "test2") - .set("longVal", "2") - .set("doubleVal", 2.0D) - .set("instantVal", "2019-02-01 00:00:00 UTC"))); + .set("strval", "test2") + .set("longval", "2") + .set("doubleval", 2.0D) + .set( + "instantval", + useStorageApi || useStorageApiApproximate + ? String.valueOf(Instant.parse("2019-02-01T00:00:00Z").getMillis() * 1000) + : "2019-02-01 00:00:00 UTC"))); } @Test @@ -2014,7 +2023,7 @@ public void testWriteValidateFailsWithAvroFormatAndStreamingInserts() { p.enableAbandonedNodeEnforcement(false); thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Writing avro formatted data is only supported for FILE_LOADS"); + thrown.expectMessage("Avro output is not supported when method == STREAMING_INSERTS"); p.apply(Create.empty(INPUT_RECORD_CODER)) .apply( BigQueryIO.write() From 4be8f9d85ea6be2e373486ccce083a8ba35aad79 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Mon, 21 Nov 2022 15:23:25 -0300 Subject: [PATCH 2/8] force unit tests --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index f8c09ad395ce..e66345f2ddb8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -1080,7 +1080,7 @@ public static InputRecord create( @Test public void testWriteAvro() throws Exception { if (useStreaming) { - // only streaming inserts does not support avro types + // only streaming inserts don't support avro types return; } p.apply( From 4dcf9a47feb97621712b3c529c54cd8771f9f243 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Wed, 30 Nov 2022 19:09:39 -0800 Subject: [PATCH 3/8] addressing comments --- .../beam/sdk/schemas/utils/AvroUtils.java | 4 +- .../AvroGenericRecordToStorageApiProto.java | 200 +++++++----------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 21 +- ...geApiDynamicDestinationsGenericRecord.java | 61 +++--- ...vroGenericRecordToStorageApiProtoTest.java | 36 ++-- 5 files changed, 154 insertions(+), 168 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index b06e73739b10..371e07187d6a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -150,7 +150,9 @@ public class AvroUtils { GenericData.get().addLogicalTypeConversion(new JodaTimestampConversion()); } - // Unwrap an AVRO schema into the base type an whether it is nullable. + /** + * Unwrap an AVRO schema into the base type an whether it is nullable. + */ public static class TypeWithNullability { final org.apache.avro.Schema type; final boolean nullable; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index c45068738598..a42614e01ec5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -17,16 +17,11 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.protobuf.ByteString; -import com.google.protobuf.DescriptorProtos.DescriptorProto; -import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; -import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label; -import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type; -import com.google.protobuf.DescriptorProtos.FileDescriptorProto; import com.google.protobuf.Descriptors.Descriptor; -import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Descriptors.FieldDescriptor; -import com.google.protobuf.Descriptors.FileDescriptor; import com.google.protobuf.DynamicMessage; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -49,8 +44,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.joda.time.Days; import org.joda.time.Instant; import org.joda.time.ReadableInstant; @@ -61,35 +54,35 @@ */ public class AvroGenericRecordToStorageApiProto { - static final Map PRIMITIVE_TYPES = - ImmutableMap.builder() - .put(Schema.Type.INT, FieldDescriptorProto.Type.TYPE_INT32) - .put(Schema.Type.FIXED, FieldDescriptorProto.Type.TYPE_BYTES) - .put(Schema.Type.LONG, FieldDescriptorProto.Type.TYPE_INT64) - .put(Schema.Type.FLOAT, FieldDescriptorProto.Type.TYPE_FLOAT) - .put(Schema.Type.DOUBLE, FieldDescriptorProto.Type.TYPE_DOUBLE) - .put(Schema.Type.STRING, FieldDescriptorProto.Type.TYPE_STRING) - .put(Schema.Type.BOOLEAN, FieldDescriptorProto.Type.TYPE_BOOL) - .put(Schema.Type.ENUM, FieldDescriptorProto.Type.TYPE_STRING) - .put(Schema.Type.BYTES, FieldDescriptorProto.Type.TYPE_BYTES) + static final Map PRIMITIVE_TYPES = + ImmutableMap.builder() + .put(Schema.Type.INT, TableFieldSchema.Type.INT64) + .put(Schema.Type.FIXED, TableFieldSchema.Type.BYTES) + .put(Schema.Type.LONG, TableFieldSchema.Type.INT64) + .put(Schema.Type.FLOAT, TableFieldSchema.Type.DOUBLE) + .put(Schema.Type.DOUBLE, TableFieldSchema.Type.DOUBLE) + .put(Schema.Type.STRING, TableFieldSchema.Type.STRING) + .put(Schema.Type.BOOLEAN, TableFieldSchema.Type.BOOL) + .put(Schema.Type.ENUM, TableFieldSchema.Type.STRING) + .put(Schema.Type.BYTES, TableFieldSchema.Type.BYTES) .build(); // A map of supported logical types to the protobuf field type. - static final Map LOGICAL_TYPES = - ImmutableMap.builder() - .put(LogicalTypes.date().getName(), FieldDescriptorProto.Type.TYPE_INT32) - .put(LogicalTypes.decimal(1).getName(), FieldDescriptorProto.Type.TYPE_BYTES) - .put(LogicalTypes.timestampMicros().getName(), FieldDescriptorProto.Type.TYPE_INT64) - .put(LogicalTypes.timestampMillis().getName(), FieldDescriptorProto.Type.TYPE_INT64) - .put(LogicalTypes.uuid().getName(), FieldDescriptorProto.Type.TYPE_STRING) + static final Map LOGICAL_TYPES = + ImmutableMap.builder() + .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE) + .put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC) + .put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP) + .put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP) + .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING) .build(); static final Map> PRIMITIVE_ENCODERS = ImmutableMap.>builder() - .put(Schema.Type.INT, Functions.identity()) + .put(Schema.Type.INT, o -> Long.valueOf((int) o)) .put(Schema.Type.FIXED, o -> ByteString.copyFrom(((GenericData.Fixed) o).bytes())) .put(Schema.Type.LONG, Functions.identity()) - .put(Schema.Type.FLOAT, Function.identity()) + .put(Schema.Type.FLOAT, o-> Double.valueOf(Float.valueOf((float)o).toString()).doubleValue()) .put(Schema.Type.DOUBLE, Function.identity()) .put(Schema.Type.STRING, Function.identity()) .put(Schema.Type.BOOLEAN, Function.identity()) @@ -154,22 +147,20 @@ static ByteString convertDecimal(LogicalType logicalType, Object value) { } /** - * Given an Avro Schema, returns a protocol-buffer Descriptor that can be used to write data using - * the BigQuery Storage API. + * Given an Avro Schema, returns a protocol-buffer TableSchema that can be used to write data + * through BigQuery Storage API. * * @param schema An Avro Schema - * @return Returns the Descriptor created from the provided Schema - * @throws com.google.protobuf.Descriptors.DescriptorValidationException + * @return Returns the TableSchema created from the provided Schema */ - public static Descriptor getDescriptorFromSchema(Schema schema) - throws DescriptorValidationException { - DescriptorProto descriptorProto = descriptorSchemaFromAvroSchema(schema); - FileDescriptorProto fileDescriptorProto = - FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build(); - FileDescriptor fileDescriptor = - FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]); + public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) { + Preconditions.checkState(!schema.getFields().isEmpty()); - return Iterables.getOnlyElement(fileDescriptor.getMessageTypes()); + TableSchema.Builder builder = TableSchema.newBuilder(); + for (Schema.Field field : schema.getFields()) { + builder.addFields(fieldDescriptorFromAvroField(field)); + } + return builder.build(); } /** @@ -197,54 +188,21 @@ public static DynamicMessage messageFromGenericRecord( return builder.build(); } - @VisibleForTesting - static DescriptorProto descriptorSchemaFromAvroSchema(Schema schema) { - Preconditions.checkState(!schema.getFields().isEmpty()); - DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder(); - // Create a unique name for the descriptor ('-' characters cannot be used). - descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_")); - int i = 1; - List nestedTypes = Lists.newArrayList(); - for (Schema.Field field : schema.getFields()) { - FieldDescriptorProto.Builder fieldDescriptorProtoBuilder = - fieldDescriptorFromAvroField(field, i++, nestedTypes); - descriptorBuilder.addField(fieldDescriptorProtoBuilder); - } - nestedTypes.forEach(descriptorBuilder::addNestedType); - return descriptorBuilder.build(); - } - - static DescriptorProto mapDescriptorSchemaFromAvroSchema( - FieldDescriptorProto.Builder keyFieldDescriptor, - FieldDescriptorProto.Builder valueFieldDescriptor, - List nestedTypes) { - DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder(); - // Create a unique name for the descriptor ('-' characters cannot be used). - descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_")); - descriptorBuilder.addField(keyFieldDescriptor); - descriptorBuilder.addField(valueFieldDescriptor); - nestedTypes.forEach(descriptorBuilder::addNestedType); - return descriptorBuilder.build(); - } - - private static FieldDescriptorProto.Builder fieldDescriptorFromAvroField( - Schema.Field field, int fieldNumber, List nestedTypes) { + private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) { @Nullable Schema schema = field.schema(); - FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder(); - fieldDescriptorBuilder = fieldDescriptorBuilder.setName(field.name().toLowerCase()); - fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber); + TableFieldSchema.Builder builder = + TableFieldSchema.newBuilder().setName(field.name().toLowerCase()); Schema elementType = null; switch (schema.getType()) { case RECORD: if (schema == null) { throw new RuntimeException("Unexpected null schema!"); } - DescriptorProto nested = descriptorSchemaFromAvroSchema(schema); - nestedTypes.add(nested); - fieldDescriptorBuilder = - fieldDescriptorBuilder - .setType(FieldDescriptorProto.Type.TYPE_MESSAGE) - .setTypeName(nested.getName()); + Preconditions.checkState(!schema.getFields().isEmpty()); + builder = builder.setType(TableFieldSchema.Type.STRUCT); + for (Schema.Field recordField : schema.getFields()) { + builder = builder.addFields(fieldDescriptorFromAvroField(recordField)); + } break; case ARRAY: elementType = TypeWithNullability.create(schema.getElementType()).getType(); @@ -253,39 +211,34 @@ private static FieldDescriptorProto.Builder fieldDescriptorFromAvroField( } Preconditions.checkState( elementType.getType() != Schema.Type.ARRAY, "Nested arrays not supported by BigQuery."); - return fieldDescriptorFromAvroField( - new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal()), - fieldNumber, - nestedTypes) - .setLabel(Label.LABEL_REPEATED); + + TableFieldSchema elementFieldSchema = + fieldDescriptorFromAvroField( + new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal())); + builder = builder.setType(elementFieldSchema.getType()); + builder.addAllFields(elementFieldSchema.getFieldsList()); + builder = builder.setMode(TableFieldSchema.Mode.REPEATED); + break; case MAP: Schema keyType = Schema.create(Schema.Type.STRING); Schema valueType = TypeWithNullability.create(schema.getElementType()).getType(); if (valueType == null) { throw new RuntimeException("Unexpected null element type!"); } - - List nestedTypesMap = Lists.newArrayList(); - - DescriptorProto nestedMap = - mapDescriptorSchemaFromAvroSchema( - fieldDescriptorFromAvroField( - new Schema.Field( - "key", keyType, "key of the map entry", Schema.Field.NULL_VALUE), - 1, - nestedTypesMap), - fieldDescriptorFromAvroField( - new Schema.Field( - "value", valueType, "value of the map entry", Schema.Field.NULL_VALUE), - 2, - nestedTypesMap), - nestedTypesMap); - - nestedTypes.add(nestedMap); - fieldDescriptorBuilder = - fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nestedMap.getName()); - - return fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED); + TableFieldSchema keyFieldSchema = + fieldDescriptorFromAvroField( + new Schema.Field("key", keyType, "key of the map entry", Schema.Field.NULL_VALUE)); + TableFieldSchema valueFieldSchema = + fieldDescriptorFromAvroField( + new Schema.Field( + "value", valueType, "value of the map entry", Schema.Field.NULL_VALUE)); + builder = + builder + .setType(TableFieldSchema.Type.STRUCT) + .addFields(keyFieldSchema) + .addFields(valueFieldSchema) + .setMode(TableFieldSchema.Mode.REPEATED); + break; case UNION: elementType = TypeWithNullability.create(schema).getType(); if (elementType == null) { @@ -295,16 +248,18 @@ private static FieldDescriptorProto.Builder fieldDescriptorFromAvroField( Preconditions.checkState( elementType.getType() != Schema.Type.UNION, "Multiple non-null union types are not supported."); - fieldDescriptorBuilder = + TableFieldSchema unionFieldSchema = fieldDescriptorFromAvroField( - new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal()), - fieldNumber, - nestedTypes); + new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal())); + builder = + builder + .setType(unionFieldSchema.getType()) + .addAllFields(unionFieldSchema.getFieldsList()); break; default: elementType = TypeWithNullability.create(schema).getType(); @Nullable - FieldDescriptorProto.Type primitiveType = + TableFieldSchema.Type primitiveType = Optional.ofNullable(LogicalTypes.fromSchema(elementType)) .map(logicalType -> LOGICAL_TYPES.get(logicalType.getName())) .orElse(PRIMITIVE_TYPES.get(elementType.getType())); @@ -313,14 +268,19 @@ private static FieldDescriptorProto.Builder fieldDescriptorFromAvroField( } // a scalar will be required by default, if defined as part of union then // caller will set nullability requirements - return fieldDescriptorBuilder.setType(primitiveType).setLabel(Label.LABEL_REQUIRED); + builder = builder.setType(primitiveType); } - if (TypeWithNullability.create(schema).isNullable()) { - fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL); - } else { - fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED); + if (builder.getMode() != TableFieldSchema.Mode.REPEATED) { + if (TypeWithNullability.create(schema).isNullable()) { + builder = builder.setMode(TableFieldSchema.Mode.NULLABLE); + } else { + builder = builder.setMode(TableFieldSchema.Mode.REQUIRED); + } + } + if (field.doc() != null) { + builder = builder.setDescription(field.doc()); } - return fieldDescriptorBuilder; + return builder.build(); } @Nullable diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 4c91c0ec5faa..050f87e16257 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -540,7 +540,13 @@ public class BigQueryIO { * A formatting function that maps a TableRow to itself. This allows sending a {@code * PCollection} directly to BigQueryIO.Write. */ - static final SerializableFunction IDENTITY_FORMATTER = input -> input; + static final SerializableFunction TABLE_ROW_IDENTITY_FORMATTER = input -> input; + + /** + * A formatting function that maps a GenericRecord to itself. This allows sending a {@code + * PCollection} directly to BigQueryIO.Write. + */ + static final SerializableFunction GENERIC_RECORD_IDENTITY_FORMATTER = input -> input; static final SerializableFunction> GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>(); @@ -1925,7 +1931,18 @@ public static Write write() { * Write#withFormatFunction(SerializableFunction)}. */ public static Write writeTableRows() { - return BigQueryIO.write().withFormatFunction(IDENTITY_FORMATTER); + return BigQueryIO.write().withFormatFunction(TABLE_ROW_IDENTITY_FORMATTER); + } + + /** + * A {@link PTransform} that writes a {@link PCollection} containing {@link GenericRecord + * GenericRecords} to a BigQuery table. + * + *

It is recommended to instead use {@link #write} with {@link + * Write#withFormatFunction(SerializableFunction)}. + */ + public static Write writeTableRows() { + return BigQueryIO.write().withFormatFunction(GENERIC_RECORD_IDENTITY_FORMATTER); } /** Implementation of {@link #write}. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java index 2d5efa4f586a..e4ba233e78ce 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java @@ -48,40 +48,41 @@ class StorageApiDynamicDestinationsGenericRecord getMessageConverter( DestinationT destination, DatasetService datasetService) throws Exception { - return new MessageConverter() { - final Descriptor descriptor; - final long descriptorHash; - final Schema avroSchema; - final TableSchema tableSchema; + return new GenericRecordConverter(destination); + } + + class GenericRecordConverter implements MessageConverter { - { - avroSchema = schemaFactory.apply(getSchema(destination)); - tableSchema = BigQueryUtils.toTableSchema(AvroUtils.toBeamSchema(avroSchema)); - descriptor = AvroGenericRecordToStorageApiProto.getDescriptorFromSchema(avroSchema); - descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor); - } + final com.google.cloud.bigquery.storage.v1.TableSchema protoTableSchema; + final Schema avroSchema; + final TableSchema bqTableSchema; + final Descriptor descriptor; - @Override - public DescriptorWrapper getSchemaDescriptor() { - return new DescriptorWrapper(descriptor, descriptorHash); - } + GenericRecordConverter(DestinationT destination) throws Exception { + avroSchema = schemaFactory.apply(getSchema(destination)); + bqTableSchema = BigQueryUtils.toTableSchema(AvroUtils.toBeamSchema(avroSchema)); + protoTableSchema = + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(avroSchema); + descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(protoTableSchema, true); + } - @Override - public void refreshSchema(long expectedHash) {} + @Override + public StorageApiWritePayload toMessage(T element) { + Message msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord( + descriptor, toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema))); + return new AutoValue_StorageApiWritePayload(msg.toByteArray()); + } - @Override - public StorageApiWritePayload toMessage(T element) { - Message msg = - AvroGenericRecordToStorageApiProto.messageFromGenericRecord( - descriptor, toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema))); - return new AutoValue_StorageApiWritePayload(msg.toByteArray(), descriptorHash); - } + @Override + public TableRow toTableRow(T element) { + return BigQueryUtils.convertGenericRecordToTableRow( + toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), bqTableSchema); + } - @Override - public TableRow toTableRow(T element) { - return BigQueryUtils.convertGenericRecordToTableRow( - toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), tableSchema); - } - }; + @Override + public com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema() { + return protoTableSchema; + } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java index b1fa63776e4a..93848a55d4a6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -72,8 +72,8 @@ enum TestEnum { SchemaBuilder.record("TestRecord") .fields() .optionalBytes("bytesValue") - .requiredInt("int32Value") - .optionalLong("int64Value") + .requiredInt("intValue") + .optionalLong("longValue") .optionalFloat("floatValue") .optionalDouble("doubleValue") .optionalString("stringValue") @@ -130,14 +130,14 @@ enum TestEnum { .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("int32value") + .setName("intvalue") .setNumber(2) - .setType(Type.TYPE_INT32) + .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("int64value") + .setName("longvalue") .setNumber(3) .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) @@ -146,7 +146,7 @@ enum TestEnum { FieldDescriptorProto.newBuilder() .setName("floatvalue") .setNumber(4) - .setType(Type.TYPE_FLOAT) + .setType(Type.TYPE_DOUBLE) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( @@ -268,8 +268,8 @@ enum TestEnum { baseRecord = new GenericRecordBuilder(BASE_SCHEMA) .set("bytesValue", BYTES) - .set("int32Value", (int) 3) - .set("int64Value", (long) 4) + .set("intValue", (int) 3) + .set("longValue", (long) 4) .set("floatValue", (float) 3.14) .set("doubleValue", (double) 2.68) .set("stringValue", "I am a string. Hear me roar.") @@ -300,9 +300,9 @@ enum TestEnum { baseProtoExpectedFields = ImmutableMap.builder() .put("bytesvalue", ByteString.copyFrom(BYTES)) - .put("int32value", (int) 3) - .put("int64value", (long) 4) - .put("floatvalue", (float) 3.14) + .put("intvalue", (long) 3) + .put("longvalue", (long) 4) + .put("floatvalue", (double) 3.14) .put("doublevalue", (double) 2.68) .put("stringvalue", "I am a string. Hear me roar.") .put("booleanvalue", true) @@ -333,7 +333,9 @@ enum TestEnum { void validateDescriptorAgainstSchema(Schema originalSchema, DescriptorProto schemaProto) { DescriptorProto descriptor = - AvroGenericRecordToStorageApiProto.descriptorSchemaFromAvroSchema(originalSchema); + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(originalSchema), + true); Map types = descriptor.getFieldList().stream() .collect( @@ -375,7 +377,8 @@ public void testDescriptorFromSchemaLogicalTypes() { @Test public void testNestedFromSchema() { DescriptorProto descriptor = - AvroGenericRecordToStorageApiProto.descriptorSchemaFromAvroSchema(NESTED_SCHEMA); + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(NESTED_SCHEMA), true); Map expectedBaseTypes = BASE_SCHEMA_PROTO.getFieldList().stream() .collect( @@ -430,7 +433,8 @@ private void assertBaseRecord(DynamicMessage msg, Map expectedFi @Test public void testMessageFromGenericRecord() throws Exception { Descriptors.Descriptor descriptor = - AvroGenericRecordToStorageApiProto.getDescriptorFromSchema(NESTED_SCHEMA); + TableRowToStorageApiProto.getDescriptorFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(NESTED_SCHEMA), true); DynamicMessage msg = AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, nestedRecord); @@ -446,7 +450,9 @@ public void testMessageFromGenericRecord() throws Exception { @Test public void testMessageFromGenericRecordLogicalTypes() throws Exception { Descriptors.Descriptor descriptor = - AvroGenericRecordToStorageApiProto.getDescriptorFromSchema(LOGICAL_TYPES_SCHEMA); + TableRowToStorageApiProto.getDescriptorFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(LOGICAL_TYPES_SCHEMA), + true); DynamicMessage msg = AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, logicalTypesRecord); assertEquals(6, msg.getAllFields().size()); From ea52993f46df1cb0b5c4b64c879af674125947f6 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Wed, 30 Nov 2022 19:31:42 -0800 Subject: [PATCH 4/8] spotless and checkstyle checks fixes --- .../AvroGenericRecordToStorageApiProto.java | 4 +++- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 15 +++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index a42614e01ec5..60e91088a2c2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -82,7 +82,9 @@ public class AvroGenericRecordToStorageApiProto { .put(Schema.Type.INT, o -> Long.valueOf((int) o)) .put(Schema.Type.FIXED, o -> ByteString.copyFrom(((GenericData.Fixed) o).bytes())) .put(Schema.Type.LONG, Functions.identity()) - .put(Schema.Type.FLOAT, o-> Double.valueOf(Float.valueOf((float)o).toString()).doubleValue()) + .put( + Schema.Type.FLOAT, + o -> Double.valueOf(Float.valueOf((float) o).toString()).doubleValue()) .put(Schema.Type.DOUBLE, Function.identity()) .put(Schema.Type.STRING, Function.identity()) .put(Schema.Type.BOOLEAN, Function.identity()) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 050f87e16257..0efacfcfdc08 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -540,13 +540,15 @@ public class BigQueryIO { * A formatting function that maps a TableRow to itself. This allows sending a {@code * PCollection} directly to BigQueryIO.Write. */ - static final SerializableFunction TABLE_ROW_IDENTITY_FORMATTER = input -> input; + static final SerializableFunction TABLE_ROW_IDENTITY_FORMATTER = + input -> input; /** * A formatting function that maps a GenericRecord to itself. This allows sending a {@code * PCollection} directly to BigQueryIO.Write. */ - static final SerializableFunction GENERIC_RECORD_IDENTITY_FORMATTER = input -> input; + static final SerializableFunction, GenericRecord> + GENERIC_RECORD_IDENTITY_FORMATTER = input -> input.getElement(); static final SerializableFunction> GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>(); @@ -1933,16 +1935,17 @@ public static Write write() { public static Write writeTableRows() { return BigQueryIO.write().withFormatFunction(TABLE_ROW_IDENTITY_FORMATTER); } - + /** - * A {@link PTransform} that writes a {@link PCollection} containing {@link GenericRecord + * A {@link PTransform} that writes a {@link PCollection} containing {@link GenericRecord * GenericRecords} to a BigQuery table. * *

It is recommended to instead use {@link #write} with {@link * Write#withFormatFunction(SerializableFunction)}. */ - public static Write writeTableRows() { - return BigQueryIO.write().withFormatFunction(GENERIC_RECORD_IDENTITY_FORMATTER); + public static Write writeGenericRecords() { + return BigQueryIO.write() + .withAvroFormatFunction(GENERIC_RECORD_IDENTITY_FORMATTER); } /** Implementation of {@link #write}. */ From 962cf510600a4947e8785a55305d7493141094b6 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Wed, 30 Nov 2022 22:27:30 -0800 Subject: [PATCH 5/8] spotless and checkstyle checks fixes --- .../java/org/apache/beam/sdk/schemas/utils/AvroUtils.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 371e07187d6a..0f71eb7a5eb8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -150,9 +150,7 @@ public class AvroUtils { GenericData.get().addLogicalTypeConversion(new JodaTimestampConversion()); } - /** - * Unwrap an AVRO schema into the base type an whether it is nullable. - */ + /** Unwrap an AVRO schema into the base type an whether it is nullable. */ public static class TypeWithNullability { final org.apache.avro.Schema type; final boolean nullable; From e3ff8cbdd26d38b3b57bec5db0f68d50e1195c37 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Mon, 5 Dec 2022 19:04:36 -0800 Subject: [PATCH 6/8] addressing more comments --- .../AvroGenericRecordToStorageApiProto.java | 14 ++-- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 7 +- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 2 - ...vroGenericRecordToStorageApiProtoTest.java | 18 +++- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 83 ++++++++++++------- 5 files changed, 77 insertions(+), 47 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index 60e91088a2c2..7becffa6d17f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -25,13 +25,13 @@ import com.google.protobuf.DynamicMessage; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; @@ -82,9 +82,7 @@ public class AvroGenericRecordToStorageApiProto { .put(Schema.Type.INT, o -> Long.valueOf((int) o)) .put(Schema.Type.FIXED, o -> ByteString.copyFrom(((GenericData.Fixed) o).bytes())) .put(Schema.Type.LONG, Functions.identity()) - .put( - Schema.Type.FLOAT, - o -> Double.valueOf(Float.valueOf((float) o).toString()).doubleValue()) + .put(Schema.Type.FLOAT, o -> Double.parseDouble(Float.valueOf((float) o).toString())) .put(Schema.Type.DOUBLE, Function.identity()) .put(Schema.Type.STRING, Function.identity()) .put(Schema.Type.BOOLEAN, Function.identity()) @@ -192,14 +190,12 @@ public static DynamicMessage messageFromGenericRecord( private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) { @Nullable Schema schema = field.schema(); + Preconditions.checkNotNull(schema, "Unexpected null schema!"); TableFieldSchema.Builder builder = TableFieldSchema.newBuilder().setName(field.name().toLowerCase()); Schema elementType = null; switch (schema.getType()) { case RECORD: - if (schema == null) { - throw new RuntimeException("Unexpected null schema!"); - } Preconditions.checkState(!schema.getFields().isEmpty()); builder = builder.setType(TableFieldSchema.Type.STRUCT); for (Schema.Field recordField : schema.getFields()) { @@ -306,12 +302,12 @@ private static Object toProtoValue( case RECORD: return messageFromGenericRecord(fieldDescriptor.getMessageType(), (GenericRecord) value); case ARRAY: - List list = (List) value; + Iterable iterable = (Iterable) value; @Nullable Schema arrayElementType = avroSchema.getElementType(); if (arrayElementType == null) { throw new RuntimeException("Unexpected null element type!"); } - return list.stream() + return StreamSupport.stream(iterable.spliterator(), false) .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v)) .collect(Collectors.toList()); case UNION: diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 0efacfcfdc08..534049e4a309 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -541,14 +541,14 @@ public class BigQueryIO { * PCollection} directly to BigQueryIO.Write. */ static final SerializableFunction TABLE_ROW_IDENTITY_FORMATTER = - input -> input; + SerializableFunctions.identity();; /** * A formatting function that maps a GenericRecord to itself. This allows sending a {@code * PCollection} directly to BigQueryIO.Write. */ static final SerializableFunction, GenericRecord> - GENERIC_RECORD_IDENTITY_FORMATTER = input -> input.getElement(); + GENERIC_RECORD_IDENTITY_FORMATTER = AvroWriteRequest::getElement; static final SerializableFunction> GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>(); @@ -1939,9 +1939,6 @@ public static Write writeTableRows() { /** * A {@link PTransform} that writes a {@link PCollection} containing {@link GenericRecord * GenericRecords} to a BigQuery table. - * - *

It is recommended to instead use {@link #write} with {@link - * Write#withFormatFunction(SerializableFunction)}. */ public static Write writeGenericRecords() { return BigQueryIO.write() diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 541ff4c4da59..fbea947d0560 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -64,7 +64,6 @@ import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; -import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -266,7 +265,6 @@ public abstract static class Builder { .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME) .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME) .put("Enum", StandardSQLTypeName.STRING) - .put(MicrosInstant.IDENTIFIER, StandardSQLTypeName.TIMESTAMP) .build(); private static final String BIGQUERY_MAP_KEY_FIELD_NAME = "key"; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java index 93848a55d4a6..94c58e414251 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -114,6 +114,9 @@ enum TestEnum { .name("timestampMicrosAsInstantValue") .type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) .noDefault() + .name("timestampMillisAsInstantValue") + .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() .name("uuidValue") .type(LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))) .noDefault() @@ -232,8 +235,15 @@ enum TestEnum { .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("uuidvalue") + .setName("timestampmillisasinstantvalue") .setNumber(6) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("uuidvalue") + .setNumber(7) .setType(Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) @@ -294,7 +304,8 @@ enum TestEnum { .set("dateValue", now) .set("timestampMicrosValue", now.getMillis() * 1000) .set("timestampMicrosAsInstantValue", now) - .set("timestampMillisValue", now.toDateTime()) + .set("timestampMillisValue", now.getMillis()) + .set("timestampMillisAsInstantValue", now) .set("uuidValue", uuid) .build(); baseProtoExpectedFields = @@ -319,6 +330,7 @@ enum TestEnum { .put("timestampmicrosvalue", now.getMillis() * 1000) .put("timestampmicrosasinstantvalue", now.getMillis() * 1000) .put("timestampmillisvalue", now.getMillis()) + .put("timestampmillisasinstantvalue", now.getMillis()) .put("uuidvalue", uuid.toString()) .build(); nestedRecord = @@ -455,7 +467,7 @@ public void testMessageFromGenericRecordLogicalTypes() throws Exception { true); DynamicMessage msg = AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, logicalTypesRecord); - assertEquals(6, msg.getAllFields().size()); + assertEquals(7, msg.getAllFields().size()); assertBaseRecord(msg, logicalTypesProtoExpectedFields); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 565d8859670f..5e3d6050ae8f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -1060,40 +1060,51 @@ public static InputRecord create( private static final Coder INPUT_RECORD_CODER = SerializableCoder.of(InputRecord.class); - @Test - public void testWriteAvro() throws Exception { - // only streaming inserts don't support avro types - assumeTrue(!useStreaming); + public void runTestWriteAvro(boolean schemaFromView) throws Exception { + String tableName = "project-id:dataset-id.table-id"; + BigQueryIO.Write bqWrite = + BigQueryIO.write() + .to(tableName) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withTestServices(fakeBqServices) + .withAvroFormatFunction( + r -> { + GenericRecord rec = new GenericData.Record(r.getSchema()); + InputRecord i = r.getElement(); + rec.put("strval", i.strVal()); + rec.put("longval", i.longVal()); + rec.put("doubleval", i.doubleVal()); + rec.put("instantval", i.instantVal().getMillis() * 1000); + return rec; + }) + .withoutValidation(); + TableSchema tableSchema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("strval").setType("STRING"), + new TableFieldSchema().setName("longval").setType("INTEGER"), + new TableFieldSchema().setName("doubleval").setType("FLOAT"), + new TableFieldSchema().setName("instantval").setType("TIMESTAMP"))); + if (schemaFromView) { + bqWrite = + bqWrite.withSchemaFromView( + p.apply( + "CreateTableSchemaString", + Create.of(KV.of(tableName, BigQueryHelpers.toJsonString(tableSchema)))) + .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) + .apply(View.asMap())); + } else { + bqWrite = bqWrite.withSchema(tableSchema); + } p.apply( Create.of( InputRecord.create("test", 1, 1.0, Instant.parse("2019-01-01T00:00:00Z")), InputRecord.create("test2", 2, 2.0, Instant.parse("2019-02-01T00:00:00Z"))) .withCoder(INPUT_RECORD_CODER)) - .apply( - BigQueryIO.write() - .to("project-id:dataset-id.table-id") - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withSchema( - new TableSchema() - .setFields( - ImmutableList.of( - new TableFieldSchema().setName("strval").setType("STRING"), - new TableFieldSchema().setName("longval").setType("INTEGER"), - new TableFieldSchema().setName("doubleval").setType("FLOAT"), - new TableFieldSchema().setName("instantval").setType("TIMESTAMP")))) - .withTestServices(fakeBqServices) - .withAvroFormatFunction( - r -> { - GenericRecord rec = new GenericData.Record(r.getSchema()); - InputRecord i = r.getElement(); - rec.put("strval", i.strVal()); - rec.put("longval", i.longVal()); - rec.put("doubleval", i.doubleVal()); - rec.put("instantval", i.instantVal().getMillis() * 1000); - return rec; - }) - .withoutValidation()); + .apply(bqWrite); + p.run(); assertThat( @@ -1119,6 +1130,22 @@ public void testWriteAvro() throws Exception { : "2019-02-01 00:00:00 UTC"))); } + @Test + public void testWriteAvro() throws Exception { + // only streaming inserts don't support avro types + assumeTrue(!useStreaming); + + runTestWriteAvro(false); + } + + @Test + public void testWriteAvroWithSchemaFromView() throws Exception { + // only streaming inserts don't support avro types + assumeTrue(useStorageApi); + + runTestWriteAvro(true); + } + @Test public void testWriteAvroWithCustomWriter() throws Exception { assumeTrue(!useStorageApi); From 33c7974d3a864fb90f6d25535e8c7715b43627ae Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Tue, 31 Jan 2023 14:58:55 -0800 Subject: [PATCH 7/8] merge from master, fix tests and validations --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 9 --------- .../StorageApiDynamicDestinationsGenericRecord.java | 9 ++++++++- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 4e0cf25f601f..6f4e838d7780 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2904,15 +2904,6 @@ public WriteResult expand(PCollection input) { !getUseBeamSchema(), "Auto schema update not supported when using Beam schemas."); } - if (method != Write.Method.FILE_LOADS) { - // we only support writing avro for FILE_LOADS - checkArgument( - getAvroRowWriterFactory() == null, - "Writing avro formatted data is only supported for FILE_LOADS, however " - + "the method was %s", - method); - } - if (input.isBounded() == IsBounded.BOUNDED) { checkArgument(!getAutoSharding(), "Auto-sharding is only applicable to unbounded input."); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java index e4ba233e78ce..020ea70df539 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java @@ -67,11 +67,12 @@ class GenericRecordConverter implements MessageConverter { } @Override + @SuppressWarnings("nullness") public StorageApiWritePayload toMessage(T element) { Message msg = AvroGenericRecordToStorageApiProto.messageFromGenericRecord( descriptor, toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema))); - return new AutoValue_StorageApiWritePayload(msg.toByteArray()); + return new AutoValue_StorageApiWritePayload(msg.toByteArray(), null); } @Override @@ -84,5 +85,11 @@ public TableRow toTableRow(T element) { public com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema() { return protoTableSchema; } + + @Override + public StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired) + throws Exception { + throw new RuntimeException("Not supported"); + } } } From fd50138de870eb33ea6fe964fa7adb3e542026b3 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Wed, 15 Feb 2023 20:45:37 -0500 Subject: [PATCH 8/8] copying the changes in AvroUtils located in core module to the avro extensions module --- .../extensions/avro/schemas/utils/AvroUtils.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index ef014af53035..fcd8c11089b8 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -152,10 +152,14 @@ public class AvroUtils { } // Unwrap an AVRO schema into the base type an whether it is nullable. - static class TypeWithNullability { + public static class TypeWithNullability { public final org.apache.avro.Schema type; public final boolean nullable; + public static TypeWithNullability create(org.apache.avro.Schema avroSchema) { + return new TypeWithNullability(avroSchema); + } + TypeWithNullability(org.apache.avro.Schema avroSchema) { if (avroSchema.getType() == Type.UNION) { List types = avroSchema.getTypes(); @@ -184,6 +188,14 @@ static class TypeWithNullability { nullable = false; } } + + public Boolean isNullable() { + return nullable; + } + + public org.apache.avro.Schema getType() { + return type; + } } /** Wrapper for fixed byte fields. */