From 5abd26b6cea9f8c6b62c6bbdbe1fa77bf0e0ff22 Mon Sep 17 00:00:00 2001 From: Reuben Date: Wed, 9 Jun 2021 15:58:08 +1000 Subject: [PATCH 01/11] ProtoSchemaTranslator now orders oneof fields in the resultant beam schema in accordance with their location in the protobuf definition --- .../protobuf/ProtoSchemaTranslator.java | 16 ++++++++++++---- .../extensions/protobuf/TestProtoSchemas.java | 10 +++++----- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java index 91eb1bd7da53..7f6d4be0a1d4 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java @@ -157,6 +157,7 @@ static Schema getSchema(Class clazz) { static Schema getSchema(Descriptors.Descriptor descriptor) { Set oneOfFields = Sets.newHashSet(); + Map oneOfFieldLocationMap = Maps.newHashMap(); List fields = Lists.newArrayListWithCapacity(descriptor.getFields().size()); for (OneofDescriptor oneofDescriptor : descriptor.getOneofs()) { List subFields = Lists.newArrayListWithCapacity(oneofDescriptor.getFieldCount()); @@ -172,17 +173,24 @@ static Schema getSchema(Descriptors.Descriptor descriptor) { enumIds.putIfAbsent(fieldDescriptor.getName(), fieldDescriptor.getNumber()) == null); } FieldType oneOfType = FieldType.logicalType(OneOfType.create(subFields, enumIds)); - fields.add(Field.of(oneofDescriptor.getName(), oneOfType)); + oneOfFieldLocationMap.put( + oneofDescriptor.getFields().get(0).getNumber(), + Field.of(oneofDescriptor.getName(), oneOfType)); } for (Descriptors.FieldDescriptor fieldDescriptor : descriptor.getFields()) { - if (!oneOfFields.contains(fieldDescriptor.getNumber())) { + int fieldDescriptorNumber = fieldDescriptor.getNumber(); + if (!oneOfFields.contains(fieldDescriptorNumber)) { // Store proto field number in metadata. FieldType fieldType = beamFieldTypeFromProtoField(fieldDescriptor); fields.add( - withFieldNumber( - Field.of(fieldDescriptor.getName(), fieldType), fieldDescriptor.getNumber()) + withFieldNumber(Field.of(fieldDescriptor.getName(), fieldType), fieldDescriptorNumber) .withOptions(getFieldOptions(fieldDescriptor))); + } else if (oneOfFieldLocationMap.containsKey(fieldDescriptorNumber)) { + Field oneOfField = oneOfFieldLocationMap.get(fieldDescriptorNumber); + if (oneOfField != null) { + fields.add(oneOfField); + } } } return Schema.builder() diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java index 6b9bdf8a81f5..2015084c0812 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java @@ -384,8 +384,8 @@ static Schema.Options withTypeName(String typeName) { static final OneOfType ONE_OF_TYPE = OneOfType.create(ONEOF_FIELDS, ONE_OF_ENUM_MAP); static final Schema ONEOF_SCHEMA = Schema.builder() - .addField("special_oneof", FieldType.logicalType(ONE_OF_TYPE)) .addField(withFieldNumber("place1", FieldType.STRING, 1)) + .addField("special_oneof", FieldType.logicalType(ONE_OF_TYPE)) .addField(withFieldNumber("place2", FieldType.INT32, 6)) .setOptions(withTypeName("proto3_schema_messages.OneOf")) .build(); @@ -393,19 +393,19 @@ static Schema.Options withTypeName(String typeName) { // Sample row instances for each OneOf case. static final Row ONEOF_ROW_INT32 = Row.withSchema(ONEOF_SCHEMA) - .addValues(ONE_OF_TYPE.createValue("oneof_int32", 1), "foo", 0) + .addValues("foo", ONE_OF_TYPE.createValue("oneof_int32", 1), 0) .build(); static final Row ONEOF_ROW_BOOL = Row.withSchema(ONEOF_SCHEMA) - .addValues(ONE_OF_TYPE.createValue("oneof_bool", true), "foo", 0) + .addValues("foo", ONE_OF_TYPE.createValue("oneof_bool", true), 0) .build(); static final Row ONEOF_ROW_STRING = Row.withSchema(ONEOF_SCHEMA) - .addValues(ONE_OF_TYPE.createValue("oneof_string", "foo"), "foo", 0) + .addValues("foo", ONE_OF_TYPE.createValue("oneof_string", "foo"), 0) .build(); static final Row ONEOF_ROW_PRIMITIVE = Row.withSchema(ONEOF_SCHEMA) - .addValues(ONE_OF_TYPE.createValue("oneof_primitive", PRIMITIVE_ROW), "foo", 0) + .addValues("foo", ONE_OF_TYPE.createValue("oneof_primitive", PRIMITIVE_ROW), 0) .build(); // Sample proto instances for each oneof case. From 89f31f5f6f770dcf06cb60c127b1b775cc06f32c Mon Sep 17 00:00:00 2001 From: Reuben Date: Wed, 18 Aug 2021 20:22:21 +1000 Subject: [PATCH 02/11] add reverse order protobuf --- .../protobuf/ProtoSchemaTranslatorTest.java | 7 ++++++ .../extensions/protobuf/TestProtoSchemas.java | 24 +++++++++++++++++++ .../test/proto/proto3_schema_messages.proto | 11 +++++++++ 3 files changed, 42 insertions(+) diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java index 9d473bf1f35b..7e8f472f69db 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java @@ -83,6 +83,13 @@ public void testOneOfSchema() { ProtoSchemaTranslator.getSchema(Proto3SchemaMessages.OneOf.class)); } + @Test + public void testReversedOneOfSchema() { + assertEquals( + TestProtoSchemas.REVERSED_ONEOF_SCHEMA, + ProtoSchemaTranslator.getSchema(Proto3SchemaMessages.ReversedOneOf.class)); + } + @Test public void testNestedOneOfSchema() { assertEquals( diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java index 2015084c0812..958f1dcdf91e 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java @@ -433,6 +433,30 @@ static Schema.Options withTypeName(String typeName) { .setOptions(withTypeName("proto3_schema_messages.OuterOneOf")) .build(); + // The schema for the ReversedOneOf proto. + // The schema for the OneOf proto. + private static final List ONEOF_FIELDS_REVERSED = + ImmutableList.of( + withFieldNumber("oneof_int32", FieldType.INT32, 5), + withFieldNumber("oneof_bool", FieldType.BOOLEAN, 4), + withFieldNumber("oneof_string", FieldType.STRING, 3), + withFieldNumber("oneof_primitive", FieldType.row(PRIMITIVE_SCHEMA), 2)); + + private static final Map ONE_OF_ENUM_MAP_REVERSED = + ONEOF_FIELDS_REVERSED.stream() + .collect(Collectors.toMap(Field::getName, f -> getFieldNumber(f))); + static final OneOfType ONE_OF_TYPE_REVERSED = + OneOfType.create(ONEOF_FIELDS_REVERSED, ONE_OF_ENUM_MAP_REVERSED); + + // static final OneOfType ONE_OF_TYPE = OneOfType.create(ONEOF_FIELDS, ONE_OF_ENUM_MAP); + static final Schema REVERSED_ONEOF_SCHEMA = + Schema.builder() + .addField(withFieldNumber("place1", FieldType.STRING, 6)) + .addField("oneof_reversed", FieldType.logicalType(ONE_OF_TYPE_REVERSED)) + .addField(withFieldNumber("place2", FieldType.INT32, 1)) + .setOptions(withTypeName("proto3_schema_messages.ReversedOneOf")) + .build(); + // A sample instance of the Row. static final Row OUTER_ONEOF_ROW = Row.withSchema(OUTER_ONEOF_SCHEMA) diff --git a/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto b/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto index a19ea043eeb2..412add37f24f 100644 --- a/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto +++ b/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto @@ -100,6 +100,17 @@ message OuterOneOf { } } +message ReversedOneOf { + string place1 = 6; + oneof oneof_reversed { + int32 oneof_int32 = 5; + bool oneof_bool = 4; + string oneof_string = 3; + Primitive oneof_primitive = 2; + } + int32 place2 = 1; +} + message EnumMessage { enum Enum { ZERO = 0; From ed9a58dae05886f17f59d4ae47d48e50637fd9d7 Mon Sep 17 00:00:00 2001 From: Reuben Date: Wed, 18 Aug 2021 21:27:00 +1000 Subject: [PATCH 03/11] add noncontiguous oneof and some renaming --- .../protobuf/ProtoSchemaTranslatorTest.java | 7 +++ .../extensions/protobuf/TestProtoSchemas.java | 54 ++++++++++++++++--- .../test/proto/proto3_schema_messages.proto | 17 ++++++ 3 files changed, 70 insertions(+), 8 deletions(-) diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java index 7e8f472f69db..f478a9403bc1 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java @@ -90,6 +90,13 @@ public void testReversedOneOfSchema() { ProtoSchemaTranslator.getSchema(Proto3SchemaMessages.ReversedOneOf.class)); } + @Test + public void testNonContiguousOneOfSchema() { + assertEquals( + TestProtoSchemas.NONCONTIGUOUS_ONEOF_SCHEMA, + ProtoSchemaTranslator.getSchema(Proto3SchemaMessages.NonContiguousOneOf.class)); + } + @Test public void testNestedOneOfSchema() { assertEquals( diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java index 958f1dcdf91e..9dff6175755d 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java @@ -434,29 +434,67 @@ static Schema.Options withTypeName(String typeName) { .build(); // The schema for the ReversedOneOf proto. - // The schema for the OneOf proto. - private static final List ONEOF_FIELDS_REVERSED = + private static final List REVERSED_ONEOF_FIELDS = ImmutableList.of( withFieldNumber("oneof_int32", FieldType.INT32, 5), withFieldNumber("oneof_bool", FieldType.BOOLEAN, 4), withFieldNumber("oneof_string", FieldType.STRING, 3), withFieldNumber("oneof_primitive", FieldType.row(PRIMITIVE_SCHEMA), 2)); - private static final Map ONE_OF_ENUM_MAP_REVERSED = - ONEOF_FIELDS_REVERSED.stream() + private static final Map REVERSED_ONE_OF_ENUM_MAP = + REVERSED_ONEOF_FIELDS.stream() .collect(Collectors.toMap(Field::getName, f -> getFieldNumber(f))); - static final OneOfType ONE_OF_TYPE_REVERSED = - OneOfType.create(ONEOF_FIELDS_REVERSED, ONE_OF_ENUM_MAP_REVERSED); + static final OneOfType REVERSED_ONE_OF_TYPE = + OneOfType.create(REVERSED_ONEOF_FIELDS, REVERSED_ONE_OF_ENUM_MAP); - // static final OneOfType ONE_OF_TYPE = OneOfType.create(ONEOF_FIELDS, ONE_OF_ENUM_MAP); static final Schema REVERSED_ONEOF_SCHEMA = Schema.builder() .addField(withFieldNumber("place1", FieldType.STRING, 6)) - .addField("oneof_reversed", FieldType.logicalType(ONE_OF_TYPE_REVERSED)) + .addField("oneof_reversed", FieldType.logicalType(REVERSED_ONE_OF_TYPE)) .addField(withFieldNumber("place2", FieldType.INT32, 1)) .setOptions(withTypeName("proto3_schema_messages.ReversedOneOf")) .build(); + // The schema for the NonContiguousOneOf proto. + private static final List NONCONTIGUOUS_ONE_ONEOF_FIELDS = + ImmutableList.of( + withFieldNumber("oneof_one_int32", FieldType.INT32, 55), + withFieldNumber("oneof_one_bool", FieldType.BOOLEAN, 1), + withFieldNumber("oneof_one_string", FieldType.STRING, 189), + withFieldNumber("oneof_one_primitive", FieldType.row(PRIMITIVE_SCHEMA), 22)); + + private static final List NONCONTIGUOUS_TWO_ONEOF_FIELDS = + ImmutableList.of( + withFieldNumber("oneof_two_first_string", FieldType.STRING, 981), + withFieldNumber("oneof_two_int32", FieldType.INT32, 2), + withFieldNumber("oneof_two_second_string", FieldType.STRING, 44)); + + private static final Map NONCONTIGUOUS_ONE_ONE_OF_ENUM_MAP = + NONCONTIGUOUS_ONE_ONEOF_FIELDS.stream() + .collect(Collectors.toMap(Field::getName, f -> getFieldNumber(f))); + + private static final Map NONCONTIGUOUS_TWO_ONE_OF_ENUM_MAP = + NONCONTIGUOUS_TWO_ONEOF_FIELDS.stream() + .collect(Collectors.toMap(Field::getName, f -> getFieldNumber(f))); + + static final OneOfType NONCONTIGUOUS_ONE_ONE_OF_TYPE = + OneOfType.create(NONCONTIGUOUS_ONE_ONEOF_FIELDS, NONCONTIGUOUS_ONE_ONE_OF_ENUM_MAP); + + static final OneOfType NONCONTIGUOUS_TWO_ONE_OF_TYPE = + OneOfType.create(NONCONTIGUOUS_TWO_ONEOF_FIELDS, NONCONTIGUOUS_TWO_ONE_OF_ENUM_MAP); + + static final Schema NONCONTIGUOUS_ONEOF_SCHEMA = + Schema.builder() + .addField(withFieldNumber("place1", FieldType.STRING, 76)) + .addField( + "oneof_non_contiguous_one", FieldType.logicalType(NONCONTIGUOUS_ONE_ONE_OF_TYPE)) + .addField(withFieldNumber("place2", FieldType.INT32, 33)) + .addField( + "oneof_non_contiguous_two", FieldType.logicalType(NONCONTIGUOUS_TWO_ONE_OF_TYPE)) + .addField(withFieldNumber("place3", FieldType.INT32, 63)) + .setOptions(withTypeName("proto3_schema_messages.NonContiguousOneOf")) + .build(); + // A sample instance of the Row. static final Row OUTER_ONEOF_ROW = Row.withSchema(OUTER_ONEOF_SCHEMA) diff --git a/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto b/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto index 412add37f24f..027486495066 100644 --- a/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto +++ b/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto @@ -111,6 +111,23 @@ message ReversedOneOf { int32 place2 = 1; } +message NonContiguousOneOf { + string place1 = 76; + oneof oneof_non_contiguous_one { + int32 oneof_one_int32 = 55; + bool oneof_one_bool = 1; + string oneof_one_string = 189; + Primitive oneof_one_primitive = 22; + } + int32 place2 = 33; + oneof oneof_non_contiguous_two { + string oneof_two_first_string = 981; + int32 oneof_two_int32 = 2; + string oneof_two_second_string = 44; + } + int32 place3 = 63; +} + message EnumMessage { enum Enum { ZERO = 0; From dab6cd5c91a3dab5d4422b570ce32b0b6c0a002b Mon Sep 17 00:00:00 2001 From: Reuben Date: Wed, 18 Aug 2021 22:02:25 +1000 Subject: [PATCH 04/11] Comments and variable renaming --- .../protobuf/ProtoSchemaTranslator.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java index 7f6d4be0a1d4..419f1c429974 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java @@ -156,14 +156,18 @@ static Schema getSchema(Class clazz) { } static Schema getSchema(Descriptors.Descriptor descriptor) { - Set oneOfFields = Sets.newHashSet(); - Map oneOfFieldLocationMap = Maps.newHashMap(); + /* OneOfComponentFields refers to the field number of the protobuf in the protobuf where the component subfields + * are. This is needed to prevent double inclusion of the component fields.*/ + Set oneOfComponentFields = Sets.newHashSet(); + /* OneOfFieldLocation stores the field number of the first field in the OneOf. Using this, we can use the location + of the first field in the OneOf as the location of the entire OneOf.*/ + Map oneOfFieldLocation = Maps.newHashMap(); List fields = Lists.newArrayListWithCapacity(descriptor.getFields().size()); for (OneofDescriptor oneofDescriptor : descriptor.getOneofs()) { List subFields = Lists.newArrayListWithCapacity(oneofDescriptor.getFieldCount()); Map enumIds = Maps.newHashMap(); for (FieldDescriptor fieldDescriptor : oneofDescriptor.getFields()) { - oneOfFields.add(fieldDescriptor.getNumber()); + oneOfComponentFields.add(fieldDescriptor.getNumber()); // Store proto field number in a field option. FieldType fieldType = beamFieldTypeFromProtoField(fieldDescriptor); subFields.add( @@ -173,21 +177,23 @@ static Schema getSchema(Descriptors.Descriptor descriptor) { enumIds.putIfAbsent(fieldDescriptor.getName(), fieldDescriptor.getNumber()) == null); } FieldType oneOfType = FieldType.logicalType(OneOfType.create(subFields, enumIds)); - oneOfFieldLocationMap.put( + oneOfFieldLocation.put( oneofDescriptor.getFields().get(0).getNumber(), Field.of(oneofDescriptor.getName(), oneOfType)); } for (Descriptors.FieldDescriptor fieldDescriptor : descriptor.getFields()) { int fieldDescriptorNumber = fieldDescriptor.getNumber(); - if (!oneOfFields.contains(fieldDescriptorNumber)) { + if (!oneOfComponentFields.contains(fieldDescriptorNumber)) { // Store proto field number in metadata. FieldType fieldType = beamFieldTypeFromProtoField(fieldDescriptor); fields.add( withFieldNumber(Field.of(fieldDescriptor.getName(), fieldType), fieldDescriptorNumber) .withOptions(getFieldOptions(fieldDescriptor))); - } else if (oneOfFieldLocationMap.containsKey(fieldDescriptorNumber)) { - Field oneOfField = oneOfFieldLocationMap.get(fieldDescriptorNumber); + /* Note that descriptor.getFields() returns an iterator in the order of the fields in the .proto file, not + * in field number order. Therefore we can safely insert the OneOfField at the field of its first component.*/ + } else if (oneOfFieldLocation.containsKey(fieldDescriptorNumber)) { + Field oneOfField = oneOfFieldLocation.get(fieldDescriptorNumber); if (oneOfField != null) { fields.add(oneOfField); } From 218b3de11eaa2be1775bad6eae3c781945d8de6a Mon Sep 17 00:00:00 2001 From: Reuben Date: Wed, 18 Aug 2021 22:27:08 +1000 Subject: [PATCH 05/11] add reversed row tests --- .../protobuf/ProtoMessageSchemaTest.java | 29 ++++++++++ .../extensions/protobuf/TestProtoSchemas.java | 53 +++++++++++++++---- 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java index 480ea1d1f858..5a6732489331 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java @@ -51,6 +51,14 @@ import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REQUIRED_PRIMITIVE_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REQUIRED_PRIMITIVE_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REQUIRED_PRIMITIVE_SCHEMA; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_BOOL; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_INT32; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_PRIMITIVE; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_STRING; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_BOOL; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_INT32; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_PRIMITIVE; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_STRING; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_SCHEMA; @@ -68,6 +76,7 @@ import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.RepeatPrimitive; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.ReversedOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.WktMessage; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -279,6 +288,26 @@ public void testOuterOneOfRowToProto() { assertEquals(OUTER_ONEOF_PROTO, fromRow.apply(OUTER_ONEOF_ROW)); } + @Test + public void testReversedOneOfProtoToRow() { + SerializableFunction toRow = + new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ReversedOneOf.class)); + assertEquals(REVERSED_ONEOF_ROW_INT32, toRow.apply(REVERSED_ONEOF_PROTO_INT32)); + assertEquals(REVERSED_ONEOF_ROW_BOOL, toRow.apply(REVERSED_ONEOF_PROTO_BOOL)); + assertEquals(REVERSED_ONEOF_ROW_STRING, toRow.apply(REVERSED_ONEOF_PROTO_STRING)); + assertEquals(REVERSED_ONEOF_ROW_PRIMITIVE, toRow.apply(REVERSED_ONEOF_PROTO_PRIMITIVE)); + } + + @Test + public void testReversedOneOfRowToProto() { + SerializableFunction fromRow = + new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ReversedOneOf.class)); + assertEquals(REVERSED_ONEOF_PROTO_INT32, fromRow.apply(REVERSED_ONEOF_ROW_INT32)); + assertEquals(REVERSED_ONEOF_PROTO_BOOL, fromRow.apply(REVERSED_ONEOF_ROW_BOOL)); + assertEquals(REVERSED_ONEOF_PROTO_STRING, fromRow.apply(REVERSED_ONEOF_ROW_STRING)); + assertEquals(REVERSED_ONEOF_PROTO_PRIMITIVE, fromRow.apply(REVERSED_ONEOF_ROW_PRIMITIVE)); + } + private static final EnumerationType ENUM_TYPE = EnumerationType.create(ImmutableMap.of("ZERO", 0, "TWO", 2, "THREE", 3)); private static final Schema ENUM_SCHEMA = diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java index 9dff6175755d..1ad4e1ca01f8 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.RepeatPrimitive; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.ReversedOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.WktMessage; import org.apache.beam.sdk.extensions.protobuf.ProtoSchemaLogicalTypes.Fixed32; import org.apache.beam.sdk.extensions.protobuf.ProtoSchemaLogicalTypes.Fixed64; @@ -433,6 +434,16 @@ static Schema.Options withTypeName(String typeName) { .setOptions(withTypeName("proto3_schema_messages.OuterOneOf")) .build(); + // A sample instance of the Row. + static final Row OUTER_ONEOF_ROW = + Row.withSchema(OUTER_ONEOF_SCHEMA) + .addValues(OUTER_ONEOF_TYPE.createValue("oneof_oneof", ONEOF_ROW_PRIMITIVE)) + .build(); + + // A sample instance of the proto. + static final OuterOneOf OUTER_ONEOF_PROTO = + OuterOneOf.newBuilder().setOneofOneof(ONEOF_PROTO_PRIMITIVE).build(); + // The schema for the ReversedOneOf proto. private static final List REVERSED_ONEOF_FIELDS = ImmutableList.of( @@ -455,6 +466,38 @@ static Schema.Options withTypeName(String typeName) { .setOptions(withTypeName("proto3_schema_messages.ReversedOneOf")) .build(); + // Sample row instances for each OneOf case. + static final Row REVERSED_ONEOF_ROW_INT32 = + Row.withSchema(REVERSED_ONEOF_SCHEMA) + .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_int32", 1), 0) + .build(); + static final Row REVERSED_ONEOF_ROW_BOOL = + Row.withSchema(REVERSED_ONEOF_SCHEMA) + .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_bool", true), 0) + .build(); + static final Row REVERSED_ONEOF_ROW_STRING = + Row.withSchema(REVERSED_ONEOF_SCHEMA) + .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_string", "foo"), 0) + .build(); + static final Row REVERSED_ONEOF_ROW_PRIMITIVE = + Row.withSchema(REVERSED_ONEOF_SCHEMA) + .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_primitive", PRIMITIVE_ROW), 0) + .build(); + + // Sample proto instances for each reversedOneOf case. + static final ReversedOneOf REVERSED_ONEOF_PROTO_INT32 = + ReversedOneOf.newBuilder().setOneofInt32(1).setPlace1("foo").setPlace2(0).build(); + static final ReversedOneOf REVERSED_ONEOF_PROTO_BOOL = + ReversedOneOf.newBuilder().setOneofBool(true).setPlace1("foo").setPlace2(0).build(); + static final ReversedOneOf REVERSED_ONEOF_PROTO_STRING = + ReversedOneOf.newBuilder().setOneofString("foo").setPlace1("foo").setPlace2(0).build(); + static final ReversedOneOf REVERSED_ONEOF_PROTO_PRIMITIVE = + ReversedOneOf.newBuilder() + .setOneofPrimitive(PRIMITIVE_PROTO) + .setPlace1("foo") + .setPlace2(0) + .build(); + // The schema for the NonContiguousOneOf proto. private static final List NONCONTIGUOUS_ONE_ONEOF_FIELDS = ImmutableList.of( @@ -495,16 +538,6 @@ static Schema.Options withTypeName(String typeName) { .setOptions(withTypeName("proto3_schema_messages.NonContiguousOneOf")) .build(); - // A sample instance of the Row. - static final Row OUTER_ONEOF_ROW = - Row.withSchema(OUTER_ONEOF_SCHEMA) - .addValues(OUTER_ONEOF_TYPE.createValue("oneof_oneof", ONEOF_ROW_PRIMITIVE)) - .build(); - - // A sample instance of the proto. - static final OuterOneOf OUTER_ONEOF_PROTO = - OuterOneOf.newBuilder().setOneofOneof(ONEOF_PROTO_PRIMITIVE).build(); - static final Schema WKT_MESSAGE_SCHEMA = Schema.builder() .addField(withFieldNumber("double", FieldType.DOUBLE, 1).withNullable(true)) From 6759183a017fe476bfac5eb1e4c5f8e3f81101a0 Mon Sep 17 00:00:00 2001 From: Reuben Date: Wed, 18 Aug 2021 22:39:39 +1000 Subject: [PATCH 06/11] add noncontiguous tests --- .../protobuf/ProtoMessageSchemaTest.java | 17 ++++++++++++++++ .../extensions/protobuf/TestProtoSchemas.java | 20 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java index 5a6732489331..f5ce63226541 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java @@ -23,6 +23,8 @@ import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_PROTO; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_PROTO; @@ -72,6 +74,7 @@ import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.EnumMessage; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NonContiguousOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; @@ -308,6 +311,20 @@ public void testReversedOneOfRowToProto() { assertEquals(REVERSED_ONEOF_PROTO_PRIMITIVE, fromRow.apply(REVERSED_ONEOF_ROW_PRIMITIVE)); } + @Test + public void testNonContiguousOneOfProtoToRow() { + SerializableFunction toRow = + new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(NonContiguousOneOf.class)); + assertEquals(NONCONTIGUOUS_ONEOF_ROW, toRow.apply(NONCONTIGUOUS_ONEOF_PROTO)); + } + + @Test + public void testNonContiguousOneOfRowToProto() { + SerializableFunction fromRow = + new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(NonContiguousOneOf.class)); + assertEquals(NONCONTIGUOUS_ONEOF_PROTO, fromRow.apply(NONCONTIGUOUS_ONEOF_ROW)); + } + private static final EnumerationType ENUM_TYPE = EnumerationType.create(ImmutableMap.of("ZERO", 0, "TWO", 2, "THREE", 3)); private static final Schema ENUM_SCHEMA = diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java index 1ad4e1ca01f8..8a530b1fe237 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.extensions.protobuf.Proto2SchemaMessages.RequiredPrimitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NonContiguousOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; @@ -538,6 +539,25 @@ static Schema.Options withTypeName(String typeName) { .setOptions(withTypeName("proto3_schema_messages.NonContiguousOneOf")) .build(); + static final Row NONCONTIGUOUS_ONEOF_ROW = + Row.withSchema(NONCONTIGUOUS_ONEOF_SCHEMA) + .addValues( + "foo", + NONCONTIGUOUS_ONE_ONE_OF_TYPE.createValue("oneof_one_int32", 1), + 0, + NONCONTIGUOUS_TWO_ONE_OF_TYPE.createValue("oneof_two_second_string", "bar"), + 343) + .build(); + + static final NonContiguousOneOf NONCONTIGUOUS_ONEOF_PROTO = + NonContiguousOneOf.newBuilder() + .setOneofOneInt32(1) + .setPlace1("foo") + .setPlace2(0) + .setOneofTwoSecondString("bar") + .setPlace3(343) + .build(); + static final Schema WKT_MESSAGE_SCHEMA = Schema.builder() .addField(withFieldNumber("double", FieldType.DOUBLE, 1).withNullable(true)) From 97a458da3ce9fc80dc9e13445b1a86007ac87fc0 Mon Sep 17 00:00:00 2001 From: Reuben Date: Mon, 30 Aug 2021 14:54:07 +1000 Subject: [PATCH 07/11] remove redundant null check --- .../beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java index 419f1c429974..eb840cf651da 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java @@ -192,7 +192,7 @@ static Schema getSchema(Descriptors.Descriptor descriptor) { .withOptions(getFieldOptions(fieldDescriptor))); /* Note that descriptor.getFields() returns an iterator in the order of the fields in the .proto file, not * in field number order. Therefore we can safely insert the OneOfField at the field of its first component.*/ - } else if (oneOfFieldLocation.containsKey(fieldDescriptorNumber)) { + } else { Field oneOfField = oneOfFieldLocation.get(fieldDescriptorNumber); if (oneOfField != null) { fields.add(oneOfField); From 0feff2ee2995afb0a12c9e21bb92c26956543702 Mon Sep 17 00:00:00 2001 From: Reuben Date: Mon, 30 Aug 2021 14:59:49 +1000 Subject: [PATCH 08/11] minor test comment update --- .../apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java index 8a530b1fe237..40055d05ec69 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java @@ -467,7 +467,7 @@ static Schema.Options withTypeName(String typeName) { .setOptions(withTypeName("proto3_schema_messages.ReversedOneOf")) .build(); - // Sample row instances for each OneOf case. + // Sample row instances for each ReversedOneOf case. static final Row REVERSED_ONEOF_ROW_INT32 = Row.withSchema(REVERSED_ONEOF_SCHEMA) .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_int32", 1), 0) @@ -485,7 +485,7 @@ static Schema.Options withTypeName(String typeName) { .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_primitive", PRIMITIVE_ROW), 0) .build(); - // Sample proto instances for each reversedOneOf case. + // Sample proto instances for each ReversedOneOf case. static final ReversedOneOf REVERSED_ONEOF_PROTO_INT32 = ReversedOneOf.newBuilder().setOneofInt32(1).setPlace1("foo").setPlace2(0).build(); static final ReversedOneOf REVERSED_ONEOF_PROTO_BOOL = From b9e78293d67513eb5dac3ef6d44e22ee5b0c7227 Mon Sep 17 00:00:00 2001 From: Reuben Date: Tue, 31 Aug 2021 14:07:10 +1000 Subject: [PATCH 09/11] update --- .../beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java index eb840cf651da..ef46b59ced9e 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java @@ -156,7 +156,7 @@ static Schema getSchema(Class clazz) { } static Schema getSchema(Descriptors.Descriptor descriptor) { - /* OneOfComponentFields refers to the field number of the protobuf in the protobuf where the component subfields + /* OneOfComponentFields refers to the field number in the protobuf where the component subfields * are. This is needed to prevent double inclusion of the component fields.*/ Set oneOfComponentFields = Sets.newHashSet(); /* OneOfFieldLocation stores the field number of the first field in the OneOf. Using this, we can use the location From 87e9c650adda316f7d1ee8f7007f4d8e4cc2395d Mon Sep 17 00:00:00 2001 From: Reuben van Ammers Date: Wed, 5 Jan 2022 17:19:19 +1100 Subject: [PATCH 10/11] add reversedonof test --- .../ProtoDynamicMessageSchemaTest.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java index 9b8f6e37850b..553200d77b85 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java @@ -45,6 +45,15 @@ import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_SCHEMA; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_BOOL; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_INT32; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_PRIMITIVE; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_STRING; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_BOOL; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_INT32; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_PRIMITIVE; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_STRING; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_SCHEMA; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_SCHEMA; @@ -65,6 +74,7 @@ import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.RepeatPrimitive; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.ReversedOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.WktMessage; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; @@ -256,6 +266,49 @@ public void testOneOfRowToProto() { assertEquals(ONEOF_PROTO_PRIMITIVE.toString(), fromRow.apply(ONEOF_ROW_PRIMITIVE).toString()); } + @Test + public void testReversedOneOfSchema() { + ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(ReversedOneOf.getDescriptor()); + Schema schema = schemaProvider.getSchema(); + assertEquals(REVERSED_ONEOF_SCHEMA, schema); + } + + @Test + public void testReversedOneOfProtoToRow() throws InvalidProtocolBufferException { + ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(ReversedOneOf.getDescriptor()); + SerializableFunction toRow = schemaProvider.getToRowFunction(); + // equality doesn't work between dynamic messages and other, + // so we compare string representation + assertEquals( + REVERSED_ONEOF_ROW_INT32.toString(), + toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_INT32)).toString()); + assertEquals( + REVERSED_ONEOF_ROW_BOOL.toString(), + toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_BOOL)).toString()); + assertEquals( + REVERSED_ONEOF_ROW_STRING.toString(), + toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_STRING)).toString()); + assertEquals( + REVERSED_ONEOF_ROW_PRIMITIVE.toString(), + toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_PRIMITIVE)).toString()); + } + + @Test + public void testReversedOneOfRowToProto() { + ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(ReversedOneOf.getDescriptor()); + SerializableFunction fromRow = schemaProvider.getFromRowFunction(); + assertEquals( + REVERSED_ONEOF_PROTO_INT32.toString(), fromRow.apply(REVERSED_ONEOF_ROW_INT32).toString()); + assertEquals( + REVERSED_ONEOF_PROTO_BOOL.toString(), fromRow.apply(REVERSED_ONEOF_ROW_BOOL).toString()); + assertEquals( + REVERSED_ONEOF_PROTO_STRING.toString(), + fromRow.apply(REVERSED_ONEOF_ROW_STRING).toString()); + assertEquals( + REVERSED_ONEOF_PROTO_PRIMITIVE.toString(), + fromRow.apply(REVERSED_ONEOF_ROW_PRIMITIVE).toString()); + } + @Test public void testOuterOneOfSchema() { ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(OuterOneOf.getDescriptor()); From 45e2f469b4e7dba401dc0a888fdf113ad3db6c1e Mon Sep 17 00:00:00 2001 From: Reuben van Ammers Date: Wed, 5 Jan 2022 18:24:32 +1100 Subject: [PATCH 11/11] add noncontiguous oneof test --- .../ProtoDynamicMessageSchemaTest.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java index 553200d77b85..fa44ed8ca6e8 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java @@ -23,6 +23,9 @@ import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_PROTO; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_ROW; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_SCHEMA; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_PROTO; @@ -70,6 +73,7 @@ import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.EnumMessage; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NonContiguousOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; @@ -309,6 +313,35 @@ public void testReversedOneOfRowToProto() { fromRow.apply(REVERSED_ONEOF_ROW_PRIMITIVE).toString()); } + @Test + public void testNonContiguousOneOfSchema() { + ProtoDynamicMessageSchema schemaProvider = + schemaFromDescriptor(NonContiguousOneOf.getDescriptor()); + Schema schema = schemaProvider.getSchema(); + assertEquals(NONCONTIGUOUS_ONEOF_SCHEMA, schema); + } + + @Test + public void testNonContiguousOneOfProtoToRow() throws InvalidProtocolBufferException { + ProtoDynamicMessageSchema schemaProvider = + schemaFromDescriptor(NonContiguousOneOf.getDescriptor()); + SerializableFunction toRow = schemaProvider.getToRowFunction(); + // equality doesn't work between dynamic messages and other, + // so we compare string representation + assertEquals( + NONCONTIGUOUS_ONEOF_ROW.toString(), + toRow.apply(toDynamic(NONCONTIGUOUS_ONEOF_PROTO)).toString()); + } + + @Test + public void testNonContiguousOneOfRowToProto() { + ProtoDynamicMessageSchema schemaProvider = + schemaFromDescriptor(NonContiguousOneOf.getDescriptor()); + SerializableFunction fromRow = schemaProvider.getFromRowFunction(); + assertEquals( + NONCONTIGUOUS_ONEOF_PROTO.toString(), fromRow.apply(NONCONTIGUOUS_ONEOF_ROW).toString()); + } + @Test public void testOuterOneOfSchema() { ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(OuterOneOf.getDescriptor());