Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ class BeamModulePlugin implements Plugin<Project> {
def cassandra_driver_version = "3.10.2"
def checkerframework_version = "3.5.0"
def classgraph_version = "4.8.65"
def gax_version = "1.57.1"
def gax_version = "1.60.0"
def generated_grpc_ga_version = "1.85.1"
def google_auth_version = "0.19.0"
def google_clients_version = "1.30.10"
Expand All @@ -403,7 +403,7 @@ class BeamModulePlugin implements Plugin<Project> {
def google_code_gson_version = "2.8.6"
def google_http_clients_version = "1.34.0"
def google_oauth_clients_version = "1.31.0"
def grpc_version = "1.27.2"
def grpc_version = "1.32.2"
def guava_version = "25.1-jre"
def hadoop_version = "2.8.5"
def hamcrest_version = "2.1"
Expand All @@ -416,7 +416,7 @@ class BeamModulePlugin implements Plugin<Project> {
def postgres_version = "42.2.16"
def powermock_version = "2.0.2"
def proto_google_common_protos_version = "1.17.0"
def protobuf_version = "3.11.1"
def protobuf_version = "3.12.0"
def quickcheck_version = "0.8"
def slf4j_version = "1.7.30"
def spark_version = "2.4.7"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.EnumMessage;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested;
Expand Down Expand Up @@ -86,6 +88,16 @@ private DynamicMessage toDynamic(Message message) throws InvalidProtocolBufferEx
return DynamicMessage.parseFrom(message.getDescriptorForType(), message.toByteArray());
}

private static <T extends Message.Builder> T parseFrom(String str, T builder) {
CharSequence charSequence = str;
try {
TextFormat.getParser().merge(charSequence, builder);
} catch (ParseException e) {
throw new IllegalArgumentException(e);
}
return builder;
}

@Test
public void testPrimitiveSchema() {
ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(Primitive.getDescriptor());
Expand Down Expand Up @@ -166,7 +178,9 @@ public void testMapProtoToRow() throws InvalidProtocolBufferException {
public void testMapRowToProto() {
ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(MapPrimitive.getDescriptor());
SerializableFunction<Row, DynamicMessage> fromRow = schemaProvider.getFromRowFunction();
assertEquals(MAP_PRIMITIVE_PROTO.toString(), fromRow.apply(MAP_PRIMITIVE_ROW).toString());
MapPrimitive proto =
parseFrom(fromRow.apply(MAP_PRIMITIVE_ROW).toString(), MapPrimitive.newBuilder()).build();
assertEquals(MAP_PRIMITIVE_PROTO, proto);
}

@Test
Expand All @@ -180,8 +194,10 @@ public void testNullMapProtoToRow() throws InvalidProtocolBufferException {
public void testNullMapRowToProto() {
ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(MapPrimitive.getDescriptor());
SerializableFunction<Row, DynamicMessage> fromRow = schemaProvider.getFromRowFunction();
assertEquals(
NULL_MAP_PRIMITIVE_PROTO.toString(), fromRow.apply(NULL_MAP_PRIMITIVE_ROW).toString());
MapPrimitive proto =
parseFrom(fromRow.apply(NULL_MAP_PRIMITIVE_ROW).toString(), MapPrimitive.newBuilder())
.build();
assertEquals(NULL_MAP_PRIMITIVE_PROTO, proto);
}

@Test
Expand All @@ -202,9 +218,8 @@ public void testNestedProtoToRow() throws InvalidProtocolBufferException {
public void testNestedRowToProto() throws InvalidProtocolBufferException {
ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(Nested.getDescriptor());
SerializableFunction<Row, DynamicMessage> fromRow = schemaProvider.getFromRowFunction();
// equality doesn't work between dynamic messages and other,
// so we compare string representation
assertEquals(NESTED_PROTO.toString(), fromRow.apply(NESTED_ROW).toString());
Nested proto = parseFrom(fromRow.apply(NESTED_ROW).toString(), Nested.newBuilder()).build();
assertEquals(NESTED_PROTO, proto);
}

@Test
Expand Down