From b6afd4a0eebd3875b1935366ed111870f08df79b Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Thu, 19 Sep 2024 23:24:52 -0700 Subject: [PATCH 1/6] implemented support for maps and array of maps for BigQuery StorageWrite API for Beam Row --- .../bigquery/BeamRowToStorageApiProto.java | 68 +++++++++++++++++-- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 2 +- .../BeamRowToStorageApiProtoTest.java | 28 +++++++- .../io/gcp/bigquery/BigQueryUtilsTest.java | 12 ++++ 4 files changed, 101 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java index 4275125ef16d..43ffa0ee66e6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java @@ -30,11 +30,13 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.temporal.ChronoUnit; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.Schema; @@ -243,7 +245,19 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) { builder = builder.setType(type); break; case MAP: - throw new RuntimeException("Map types not supported by BigQuery."); + @Nullable FieldType keyType = field.getType().getMapKeyType(); + @Nullable FieldType valueType = field.getType().getMapValueType(); + if (keyType == null || valueType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + + builder = + builder + .setType(TableFieldSchema.Type.STRUCT) + .addFields(fieldDescriptorFromBeamField(Field.of("key", keyType))) + .addFields(fieldDescriptorFromBeamField(Field.of("value", valueType))) + .setMode(TableFieldSchema.Mode.REPEATED); + break; default: @Nullable TableFieldSchema.Type primitiveType = PRIMITIVE_TYPES.get(field.getType().getTypeName()); @@ -272,6 +286,8 @@ private static Object messageValueFromRowValue( if (value == null) { if (fieldDescriptor.isOptional()) { return null; + } else if (fieldDescriptor.isRepeated()) { + return Collections.emptyList(); } else { throw new IllegalArgumentException( "Received null value for non-nullable field " + fieldDescriptor.getName()); @@ -291,9 +307,17 @@ private static Object toProtoValue( if (arrayElementType == null) { throw new RuntimeException("Unexpected null element type!"); } - return list.stream() - .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v)) - .collect(Collectors.toList()); + boolean shouldFlatMap = + arrayElementType.getTypeName().isCollectionType() + || arrayElementType.getTypeName().isMapType(); + + Stream valueStream = + list.stream().map(v -> toProtoValue(fieldDescriptor, arrayElementType, v)); + + if (shouldFlatMap) { + valueStream = valueStream.flatMap(vs -> ((List) vs).stream()); + } + return valueStream.collect(Collectors.toList()); case ITERABLE: Iterable iterable = (Iterable) value; @Nullable FieldType iterableElementType = beamFieldType.getCollectionElementType(); @@ -304,7 +328,19 @@ private static Object toProtoValue( .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v)) .collect(Collectors.toList()); case MAP: - throw new RuntimeException("Map types not supported by BigQuery."); + Map map = (Map) value; + @Nullable FieldType keyType = beamFieldType.getMapKeyType(); + @Nullable FieldType valueType = beamFieldType.getMapValueType(); + if (keyType == null || valueType == null) { + throw new RuntimeException("Unexpected null element type!"); + } + + return map.entrySet().stream() + .map( + (Map.Entry entry) -> + mapEntryToProtoValue( + fieldDescriptor.getMessageType(), keyType, valueType, entry)) + .collect(Collectors.toList()); default: return scalarToProtoValue(beamFieldType, value); } @@ -334,6 +370,28 @@ static Object scalarToProtoValue(FieldType beamFieldType, Object value) { } } + static Object mapEntryToProtoValue( + Descriptor descriptor, + FieldType keyFieldType, + FieldType valueFieldType, + Map.Entry entryValue) { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); + FieldDescriptor keyFieldDescriptor = + Preconditions.checkNotNull(descriptor.findFieldByName("key")); + @Nullable Object key = toProtoValue(keyFieldDescriptor, keyFieldType, entryValue.getKey()); + if (key != null) { + builder.setField(keyFieldDescriptor, key); + } + FieldDescriptor valueFieldDescriptor = + Preconditions.checkNotNull(descriptor.findFieldByName("value")); + @Nullable + Object value = toProtoValue(valueFieldDescriptor, valueFieldType, entryValue.getValue()); + if (value != null) { + builder.setField(valueFieldDescriptor, value); + } + return builder.build(); + } + static ByteString serializeBigDecimalToNumeric(BigDecimal o) { return serializeBigDecimal(o, NUMERIC_SCALE, MAX_NUMERIC_VALUE, MIN_NUMERIC_VALUE, "Numeric"); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 305abad5783a..084f41690a68 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -403,7 +403,7 @@ private static List toTableFieldSchema(Schema schema) { } if (type.getTypeName().isCollectionType()) { type = Preconditions.checkArgumentNotNull(type.getCollectionElementType()); - if (type.getTypeName().isCollectionType() || type.getTypeName().isMapType()) { + if (type.getTypeName().isCollectionType() && !type.getTypeName().isMapType()) { throw new IllegalArgumentException("Array of collection is not supported in BigQuery."); } field.setMode(Mode.REPEATED.toString()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java index 5c43688a6efa..6233912b5cc4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos.DescriptorProto; @@ -36,7 +37,9 @@ import java.time.LocalTime; import java.time.temporal.ChronoUnit; import java.util.Map; +import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -273,12 +276,14 @@ public class BeamRowToStorageApiProtoTest { .addField("nested", FieldType.row(BASE_SCHEMA).withNullable(true)) .addField("nestedArray", FieldType.array(FieldType.row(BASE_SCHEMA))) .addField("nestedIterable", FieldType.iterable(FieldType.row(BASE_SCHEMA))) + .addField("nestedMap", FieldType.map(FieldType.STRING, FieldType.row(BASE_SCHEMA))) .build(); private static final Row NESTED_ROW = Row.withSchema(NESTED_SCHEMA) .withFieldValue("nested", BASE_ROW) .withFieldValue("nestedArray", ImmutableList.of(BASE_ROW, BASE_ROW)) .withFieldValue("nestedIterable", ImmutableList.of(BASE_ROW, BASE_ROW)) + .withFieldValue("nestedMap", ImmutableMap.of("key1", BASE_ROW, "key2", BASE_ROW)) .build(); @Test @@ -336,12 +341,12 @@ public void testNestedFromSchema() { .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getLabel)); - assertEquals(3, types.size()); + assertEquals(4, types.size()); Map nestedTypes = descriptor.getNestedTypeList().stream() .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity())); - assertEquals(3, nestedTypes.size()); + assertEquals(4, nestedTypes.size()); assertEquals(Type.TYPE_MESSAGE, types.get("nested")); assertEquals(Label.LABEL_OPTIONAL, typeLabels.get("nested")); String nestedTypeName1 = typeNames.get("nested"); @@ -368,6 +373,23 @@ public void testNestedFromSchema() { .collect( Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); assertEquals(expectedBaseTypes, nestedTypes3); + + assertEquals(Type.TYPE_MESSAGE, types.get("nestedmap")); + assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedmap")); + String nestedTypeName4 = typeNames.get("nestedmap"); + // expects 2 fields in the nested map, key and value + assertEquals(2, nestedTypes.get(nestedTypeName4).getFieldList().size()); + Supplier> stream = + () -> nestedTypes.get(nestedTypeName4).getFieldList().stream(); + assertTrue(stream.get().anyMatch(fdp -> fdp.getName().equals("key"))); + assertTrue(stream.get().anyMatch(fdp -> fdp.getName().equals("value"))); + + Map nestedTypes4 = + nestedTypes.get(nestedTypeName4).getNestedTypeList().stream() + .flatMap(vdesc -> vdesc.getFieldList().stream()) + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + assertEquals(expectedBaseTypes, nestedTypes4); } private void assertBaseRecord(DynamicMessage msg) { @@ -384,7 +406,7 @@ public void testMessageFromTableRow() throws Exception { BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(NESTED_SCHEMA), true, false); DynamicMessage msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, NESTED_ROW, null, -1); - assertEquals(3, msg.getAllFields().size()); + assertEquals(4, msg.getAllFields().size()); Map fieldDescriptors = descriptor.getFields().stream() diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index e13e4a92a4dc..828d12322d2f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -698,6 +698,18 @@ public void testToTableSchema_map() { assertThat(field.getFields(), containsInAnyOrder(MAP_KEY, MAP_VALUE)); } + @Test + public void testToTableSchema_map_array() { + TableSchema schema = toTableSchema(MAP_ARRAY_TYPE); + + assertThat(schema.getFields().size(), equalTo(1)); + TableFieldSchema field = schema.getFields().get(0); + assertThat(field.getName(), equalTo("map")); + assertThat(field.getType(), equalTo(StandardSQLTypeName.STRUCT.toString())); + assertThat(field.getMode(), equalTo(Mode.REPEATED.toString())); + assertThat(field.getFields(), containsInAnyOrder(MAP_KEY, MAP_VALUE)); + } + @Test public void testToTableRow_flat() { TableRow row = toTableRow().apply(FLAT_ROW); From f6ec654d95016acffbcd1341785a9da431a4d2fc Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Fri, 20 Sep 2024 11:14:11 -0700 Subject: [PATCH 2/6] fix cdc test --- .../beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java index 6233912b5cc4..c655ffd9ab3e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java @@ -424,7 +424,7 @@ public void testCdcFields() throws Exception { assertNotNull(descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)); DynamicMessage msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, NESTED_ROW, "UPDATE", 42); - assertEquals(5, msg.getAllFields().size()); + assertEquals(6, msg.getAllFields().size()); Map fieldDescriptors = descriptor.getFields().stream() From 02cf883d765ad96b22ddb4d81dc1c3642cc16b4b Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Tue, 15 Oct 2024 14:17:05 -0700 Subject: [PATCH 3/6] addressing comments and adding test for arrays and maps particularities. --- .../bigquery/BeamRowToStorageApiProto.java | 57 +++++---- .../BeamRowToStorageApiProtoTest.java | 115 ++++++++++++++++++ 2 files changed, 145 insertions(+), 27 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java index 43ffa0ee66e6..c3552548373d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java @@ -30,7 +30,6 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.temporal.ChronoUnit; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.BiFunction; @@ -222,7 +221,7 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) { case ITERABLE: @Nullable FieldType elementType = field.getType().getCollectionElementType(); if (elementType == null) { - throw new RuntimeException("Unexpected null element type!"); + throw new RuntimeException("Unexpected null element type on " + field.getName()); } Preconditions.checkState( !Preconditions.checkNotNull(elementType.getTypeName()).isCollectionType(), @@ -247,8 +246,13 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) { case MAP: @Nullable FieldType keyType = field.getType().getMapKeyType(); @Nullable FieldType valueType = field.getType().getMapValueType(); - if (keyType == null || valueType == null) { - throw new RuntimeException("Unexpected null element type!"); + if (keyType == null) { + throw new RuntimeException( + "Unexpected null element type for the map's key on " + field.getName()); + } + if (valueType == null) { + throw new RuntimeException( + "Unexpected null element type for the map's value on " + field.getName()); } builder = @@ -287,7 +291,7 @@ private static Object messageValueFromRowValue( if (fieldDescriptor.isOptional()) { return null; } else if (fieldDescriptor.isRepeated()) { - return Collections.emptyList(); + return null; } else { throw new IllegalArgumentException( "Received null value for non-nullable field " + fieldDescriptor.getName()); @@ -302,37 +306,36 @@ private static Object toProtoValue( case ROW: return messageFromBeamRow(fieldDescriptor.getMessageType(), (Row) value, null, -1); case ARRAY: - List list = (List) value; - @Nullable FieldType arrayElementType = beamFieldType.getCollectionElementType(); - if (arrayElementType == null) { - throw new RuntimeException("Unexpected null element type!"); - } - boolean shouldFlatMap = - arrayElementType.getTypeName().isCollectionType() - || arrayElementType.getTypeName().isMapType(); - - Stream valueStream = - list.stream().map(v -> toProtoValue(fieldDescriptor, arrayElementType, v)); - - if (shouldFlatMap) { - valueStream = valueStream.flatMap(vs -> ((List) vs).stream()); - } - return valueStream.collect(Collectors.toList()); case ITERABLE: Iterable iterable = (Iterable) value; @Nullable FieldType iterableElementType = beamFieldType.getCollectionElementType(); if (iterableElementType == null) { - throw new RuntimeException("Unexpected null element type!"); + throw new RuntimeException("Unexpected null element type: " + fieldDescriptor.getName()); } - return StreamSupport.stream(iterable.spliterator(), false) - .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v)) - .collect(Collectors.toList()); + // We currently only support maps as non-row or non-scalar element types + // given that BigQuery does not support nested arrays. If the element type is of map type + // we should flatten it given how is being translated (as a list of proto(key, value). + boolean shouldFlattenIterable = iterableElementType.getTypeName().isMapType(); + + Stream iterableValueStream = + StreamSupport.stream(iterable.spliterator(), false) + .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v)); + + if (shouldFlattenIterable) { + iterableValueStream = iterableValueStream.flatMap(vs -> ((List) vs).stream()); + } + + return iterableValueStream.collect(Collectors.toList()); case MAP: Map map = (Map) value; @Nullable FieldType keyType = beamFieldType.getMapKeyType(); @Nullable FieldType valueType = beamFieldType.getMapValueType(); - if (keyType == null || valueType == null) { - throw new RuntimeException("Unexpected null element type!"); + if (keyType == null) { + throw new RuntimeException("Unexpected null for key type: " + fieldDescriptor.getName()); + } + if (valueType == null) { + throw new RuntimeException( + "Unexpected null for value type: " + fieldDescriptor.getName()); } return map.entrySet().stream() diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java index c655ffd9ab3e..58f6251ba854 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java @@ -392,6 +392,63 @@ public void testNestedFromSchema() { assertEquals(expectedBaseTypes, nestedTypes4); } + @Test + public void testMapArrayFromSchemas() { + Schema nestedMapSchemaVariations = + Schema.builder() + .addField( + "nestedArrayOfMaps", + FieldType.array(FieldType.map(FieldType.STRING, FieldType.STRING))) + .addField( + "nestedMapNullable", + FieldType.map(FieldType.STRING, FieldType.DOUBLE).withNullable(true)) + .build(); + + DescriptorProto descriptor = + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema((nestedMapSchemaVariations)), + true, + false); + + Map types = + descriptor.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType)); + Map typeNames = + descriptor.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getTypeName)); + Map typeLabels = + descriptor.getFieldList().stream() + .collect( + Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getLabel)); + + Map nestedTypes = + descriptor.getNestedTypeList().stream() + .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity())); + assertEquals(2, nestedTypes.size()); + + assertEquals(Type.TYPE_MESSAGE, types.get("nestedarrayofmaps")); + assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedarrayofmaps")); + String nestedArrayOfMapsName = typeNames.get("nestedarrayofmaps"); + // expects 2 fields for the nested array of maps, key and value + assertEquals(2, nestedTypes.get(nestedArrayOfMapsName).getFieldList().size()); + Supplier> stream = + () -> nestedTypes.get(nestedArrayOfMapsName).getFieldList().stream(); + assertTrue(stream.get().filter(fdp -> fdp.getName().equals("key")).count() == 1); + assertTrue(stream.get().filter(fdp -> fdp.getName().equals("value")).count() == 1); + + assertEquals(Type.TYPE_MESSAGE, types.get("nestedmapnullable")); + // even though the field is marked as optional in the row we will should see repeated in proto + assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedmapnullable")); + String nestedMapNullableName = typeNames.get("nestedmapnullable"); + // expects 2 fields in the nullable maps, key and value + assertEquals(2, nestedTypes.get(nestedMapNullableName).getFieldList().size()); + stream = () -> nestedTypes.get(nestedMapNullableName).getFieldList().stream(); + assertTrue(stream.get().filter(fdp -> fdp.getName().equals("key")).count() == 1); + assertTrue(stream.get().filter(fdp -> fdp.getName().equals("value")).count() == 1); + } + private void assertBaseRecord(DynamicMessage msg) { Map recordFields = msg.getAllFields().entrySet().stream() @@ -415,6 +472,64 @@ public void testMessageFromTableRow() throws Exception { assertBaseRecord(nestedMsg); } + @Test + public void testMessageFromTableRowForArraysAndMaps() throws Exception { + Schema nestedMapSchemaVariations = + Schema.builder() + .addField( + "nestedArrayOfMaps", + FieldType.array(FieldType.map(FieldType.STRING, FieldType.STRING))) + .addField("nestedArrayNullable", FieldType.array(FieldType.STRING).withNullable(true)) + .addField("nestedMap", FieldType.map(FieldType.STRING, FieldType.STRING)) + .addField( + "nestedMapNullable", + FieldType.map(FieldType.STRING, FieldType.DOUBLE).withNullable(true)) + .build(); + + Row nestedRow = + Row.withSchema(nestedMapSchemaVariations) + .withFieldValue( + "nestedArrayOfMaps", ImmutableList.of(ImmutableMap.of("key1", "value1"))) + .withFieldValue("nestedArrayNullable", null) + .withFieldValue("nestedMap", ImmutableMap.of("key1", "value1")) + .withFieldValue("nestedMapNullable", null) + .build(); + + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(nestedMapSchemaVariations), + true, + false); + DynamicMessage msg = + BeamRowToStorageApiProto.messageFromBeamRow(descriptor, nestedRow, null, -1); + + Map fieldDescriptors = + descriptor.getFields().stream() + .collect(Collectors.toMap(FieldDescriptor::getName, Functions.identity())); + + DynamicMessage nestedArrayOfMapEntryMsg = + (DynamicMessage) msg.getRepeatedField(fieldDescriptors.get("nestedarrayofmaps"), 0); + String value = + (String) + nestedArrayOfMapEntryMsg.getField( + fieldDescriptors + .get("nestedarrayofmaps") + .getMessageType() + .findFieldByName("value")); + assertEquals("value1", value); + + DynamicMessage nestedMapEntryMsg = + (DynamicMessage) msg.getRepeatedField(fieldDescriptors.get("nestedmap"), 0); + value = + (String) + nestedMapEntryMsg.getField( + fieldDescriptors.get("nestedmap").getMessageType().findFieldByName("value")); + assertEquals("value1", value); + + assertTrue(msg.getRepeatedFieldCount(fieldDescriptors.get("nestedarraynullable")) == 0); + assertTrue(msg.getRepeatedFieldCount(fieldDescriptors.get("nestedmapnullable")) == 0); + } + @Test public void testCdcFields() throws Exception { Descriptor descriptor = From 479d8bbf32276cb2ba4988fa8b1ca92670cd96ca Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Tue, 15 Oct 2024 14:50:55 -0700 Subject: [PATCH 4/6] merging from master --- .../beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java index c3552548373d..c778f4e3dca1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java @@ -30,6 +30,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.temporal.ChronoUnit; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.BiFunction; @@ -291,7 +292,7 @@ private static Object messageValueFromRowValue( if (fieldDescriptor.isOptional()) { return null; } else if (fieldDescriptor.isRepeated()) { - return null; + return Collections.emptyList(); } else { throw new IllegalArgumentException( "Received null value for non-nullable field " + fieldDescriptor.getName()); From df8f7c240793c4f78ef2ca8400f1c192c7c00e51 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Wed, 16 Oct 2024 11:48:58 -0700 Subject: [PATCH 5/6] including tests to validate behavior for multimaps and iterables of maps as well. --- .../BeamRowToStorageApiProtoTest.java | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java index 58f6251ba854..0b92ec3f5724 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java @@ -36,6 +36,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.temporal.ChronoUnit; +import java.util.List; import java.util.Map; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -479,8 +480,14 @@ public void testMessageFromTableRowForArraysAndMaps() throws Exception { .addField( "nestedArrayOfMaps", FieldType.array(FieldType.map(FieldType.STRING, FieldType.STRING))) + .addField( + "nestedIterableOfMaps", + FieldType.iterable(FieldType.map(FieldType.STRING, FieldType.STRING))) .addField("nestedArrayNullable", FieldType.array(FieldType.STRING).withNullable(true)) .addField("nestedMap", FieldType.map(FieldType.STRING, FieldType.STRING)) + .addField( + "nestedMultiMap", + FieldType.map(FieldType.STRING, FieldType.iterable(FieldType.STRING))) .addField( "nestedMapNullable", FieldType.map(FieldType.STRING, FieldType.DOUBLE).withNullable(true)) @@ -489,9 +496,15 @@ public void testMessageFromTableRowForArraysAndMaps() throws Exception { Row nestedRow = Row.withSchema(nestedMapSchemaVariations) .withFieldValue( - "nestedArrayOfMaps", ImmutableList.of(ImmutableMap.of("key1", "value1"))) + "nestedArrayOfMaps", ImmutableList.of(ImmutableMap.of("arraykey1", "arrayvalue1"))) + .withFieldValue( + "nestedIterableOfMaps", + ImmutableList.of(ImmutableMap.of("iterablekey1", "iterablevalue1"))) .withFieldValue("nestedArrayNullable", null) .withFieldValue("nestedMap", ImmutableMap.of("key1", "value1")) + .withFieldValue( + "nestedMultiMap", + ImmutableMap.of("multikey1", ImmutableList.of("multivalue1", "multivalue2"))) .withFieldValue("nestedMapNullable", null) .build(); @@ -516,7 +529,18 @@ public void testMessageFromTableRowForArraysAndMaps() throws Exception { .get("nestedarrayofmaps") .getMessageType() .findFieldByName("value")); - assertEquals("value1", value); + assertEquals("arrayvalue1", value); + + DynamicMessage nestedIterableOfMapEntryMsg = + (DynamicMessage) msg.getRepeatedField(fieldDescriptors.get("nestediterableofmaps"), 0); + value = + (String) + nestedIterableOfMapEntryMsg.getField( + fieldDescriptors + .get("nestediterableofmaps") + .getMessageType() + .findFieldByName("value")); + assertEquals("iterablevalue1", value); DynamicMessage nestedMapEntryMsg = (DynamicMessage) msg.getRepeatedField(fieldDescriptors.get("nestedmap"), 0); @@ -526,6 +550,15 @@ public void testMessageFromTableRowForArraysAndMaps() throws Exception { fieldDescriptors.get("nestedmap").getMessageType().findFieldByName("value")); assertEquals("value1", value); + DynamicMessage nestedMultiMapEntryMsg = + (DynamicMessage) msg.getRepeatedField(fieldDescriptors.get("nestedmultimap"), 0); + List values = + (List) + nestedMultiMapEntryMsg.getField( + fieldDescriptors.get("nestedmultimap").getMessageType().findFieldByName("value")); + assertTrue(values.size() == 2); + assertEquals("multivalue1", values.get(0)); + assertTrue(msg.getRepeatedFieldCount(fieldDescriptors.get("nestedarraynullable")) == 0); assertTrue(msg.getRepeatedFieldCount(fieldDescriptors.get("nestedmapnullable")) == 0); } From 743698a329b58bb725b0a41e1e92192d346c2682 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Thu, 17 Oct 2024 13:07:15 -0700 Subject: [PATCH 6/6] removing the flattening of nested container types and adding a check for array not being supported --- .../bigquery/BeamRowToStorageApiProto.java | 29 ++++------ .../sdk/io/gcp/bigquery/BigQueryUtils.java | 2 +- .../BeamRowToStorageApiProtoTest.java | 58 +++++-------------- 3 files changed, 29 insertions(+), 60 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java index c778f4e3dca1..d7ca787feea3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java @@ -31,12 +31,10 @@ import java.time.LocalTime; import java.time.temporal.ChronoUnit; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.Schema; @@ -224,9 +222,16 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) { if (elementType == null) { throw new RuntimeException("Unexpected null element type on " + field.getName()); } + TypeName containedTypeName = + Preconditions.checkNotNull( + elementType.getTypeName(), + "Null type name found in contained type at " + field.getName()); Preconditions.checkState( - !Preconditions.checkNotNull(elementType.getTypeName()).isCollectionType(), - "Nested arrays not supported by BigQuery."); + !(containedTypeName.isCollectionType() || containedTypeName.isMapType()), + "Nested container types are not supported by BigQuery. Field " + + field.getName() + + " contains a type " + + containedTypeName.name()); TableFieldSchema elementFieldSchema = fieldDescriptorFromBeamField(Field.of(field.getName(), elementType)); builder = builder.setType(elementFieldSchema.getType()); @@ -313,20 +318,10 @@ private static Object toProtoValue( if (iterableElementType == null) { throw new RuntimeException("Unexpected null element type: " + fieldDescriptor.getName()); } - // We currently only support maps as non-row or non-scalar element types - // given that BigQuery does not support nested arrays. If the element type is of map type - // we should flatten it given how is being translated (as a list of proto(key, value). - boolean shouldFlattenIterable = iterableElementType.getTypeName().isMapType(); - Stream iterableValueStream = - StreamSupport.stream(iterable.spliterator(), false) - .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v)); - - if (shouldFlattenIterable) { - iterableValueStream = iterableValueStream.flatMap(vs -> ((List) vs).stream()); - } - - return iterableValueStream.collect(Collectors.toList()); + return StreamSupport.stream(iterable.spliterator(), false) + .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v)) + .collect(Collectors.toList()); case MAP: Map map = (Map) value; @Nullable FieldType keyType = beamFieldType.getMapKeyType(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index de979685fc0a..b4d110f90fe2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -412,7 +412,7 @@ private static List toTableFieldSchema(Schema schema) { } if (type.getTypeName().isCollectionType()) { type = Preconditions.checkArgumentNotNull(type.getCollectionElementType()); - if (type.getTypeName().isCollectionType() && !type.getTypeName().isMapType()) { + if (type.getTypeName().isCollectionType() || type.getTypeName().isMapType()) { throw new IllegalArgumentException("Array of collection is not supported in BigQuery."); } field.setMode(Mode.REPEATED.toString()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java index 0cd90c0ba2c4..d8c580a0cd18 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java @@ -405,12 +405,12 @@ public void testNestedFromSchema() { } @Test - public void testMapArrayFromSchemas() { + public void testParticularMapsFromSchemas() { Schema nestedMapSchemaVariations = Schema.builder() .addField( - "nestedArrayOfMaps", - FieldType.array(FieldType.map(FieldType.STRING, FieldType.STRING))) + "nestedMultiMap", + FieldType.map(FieldType.STRING, FieldType.array(FieldType.STRING))) .addField( "nestedMapNullable", FieldType.map(FieldType.STRING, FieldType.DOUBLE).withNullable(true)) @@ -440,15 +440,22 @@ public void testMapArrayFromSchemas() { .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity())); assertEquals(2, nestedTypes.size()); - assertEquals(Type.TYPE_MESSAGE, types.get("nestedarrayofmaps")); - assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedarrayofmaps")); - String nestedArrayOfMapsName = typeNames.get("nestedarrayofmaps"); + assertEquals(Type.TYPE_MESSAGE, types.get("nestedmultimap")); + assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedmultimap")); + String nestedMultiMapName = typeNames.get("nestedmultimap"); // expects 2 fields for the nested array of maps, key and value - assertEquals(2, nestedTypes.get(nestedArrayOfMapsName).getFieldList().size()); + assertEquals(2, nestedTypes.get(nestedMultiMapName).getFieldList().size()); Supplier> stream = - () -> nestedTypes.get(nestedArrayOfMapsName).getFieldList().stream(); + () -> nestedTypes.get(nestedMultiMapName).getFieldList().stream(); assertTrue(stream.get().filter(fdp -> fdp.getName().equals("key")).count() == 1); assertTrue(stream.get().filter(fdp -> fdp.getName().equals("value")).count() == 1); + assertTrue( + stream + .get() + .filter(fdp -> fdp.getName().equals("value")) + .filter(fdp -> fdp.getLabel().equals(Label.LABEL_REPEATED)) + .count() + == 1); assertEquals(Type.TYPE_MESSAGE, types.get("nestedmapnullable")); // even though the field is marked as optional in the row we will should see repeated in proto @@ -488,12 +495,6 @@ public void testMessageFromTableRow() throws Exception { public void testMessageFromTableRowForArraysAndMaps() throws Exception { Schema nestedMapSchemaVariations = Schema.builder() - .addField( - "nestedArrayOfMaps", - FieldType.array(FieldType.map(FieldType.STRING, FieldType.STRING))) - .addField( - "nestedIterableOfMaps", - FieldType.iterable(FieldType.map(FieldType.STRING, FieldType.STRING))) .addField("nestedArrayNullable", FieldType.array(FieldType.STRING).withNullable(true)) .addField("nestedMap", FieldType.map(FieldType.STRING, FieldType.STRING)) .addField( @@ -506,11 +507,6 @@ public void testMessageFromTableRowForArraysAndMaps() throws Exception { Row nestedRow = Row.withSchema(nestedMapSchemaVariations) - .withFieldValue( - "nestedArrayOfMaps", ImmutableList.of(ImmutableMap.of("arraykey1", "arrayvalue1"))) - .withFieldValue( - "nestedIterableOfMaps", - ImmutableList.of(ImmutableMap.of("iterablekey1", "iterablevalue1"))) .withFieldValue("nestedArrayNullable", null) .withFieldValue("nestedMap", ImmutableMap.of("key1", "value1")) .withFieldValue( @@ -531,31 +527,9 @@ public void testMessageFromTableRowForArraysAndMaps() throws Exception { descriptor.getFields().stream() .collect(Collectors.toMap(FieldDescriptor::getName, Functions.identity())); - DynamicMessage nestedArrayOfMapEntryMsg = - (DynamicMessage) msg.getRepeatedField(fieldDescriptors.get("nestedarrayofmaps"), 0); - String value = - (String) - nestedArrayOfMapEntryMsg.getField( - fieldDescriptors - .get("nestedarrayofmaps") - .getMessageType() - .findFieldByName("value")); - assertEquals("arrayvalue1", value); - - DynamicMessage nestedIterableOfMapEntryMsg = - (DynamicMessage) msg.getRepeatedField(fieldDescriptors.get("nestediterableofmaps"), 0); - value = - (String) - nestedIterableOfMapEntryMsg.getField( - fieldDescriptors - .get("nestediterableofmaps") - .getMessageType() - .findFieldByName("value")); - assertEquals("iterablevalue1", value); - DynamicMessage nestedMapEntryMsg = (DynamicMessage) msg.getRepeatedField(fieldDescriptors.get("nestedmap"), 0); - value = + String value = (String) nestedMapEntryMsg.getField( fieldDescriptors.get("nestedmap").getMessageType().findFieldByName("value"));