From 25054b82292d2a9a2c539964d4a87e89e4dfdaad Mon Sep 17 00:00:00 2001 From: Alex Van Boxel Date: Sun, 28 Apr 2019 13:16:23 +0200 Subject: [PATCH 1/4] [BEAM-5967] Add handling of DynamicMessage in ProtoCoder The ProtoCoder was unable to handle DynamicMessage as it was unable to get a message specific parser. The Coder is expanded to handle DynamicMessage as a special case. It stores the complete descriptor set when (de)serializing. Design decision: Although DynamicMessage could in theory have a different schema per message in a single stream this doesn't make sense. The common use-case for using DynamicMessages is when the schema is not known at compile type, but is known at pipeline build time (example, like pulled from a schema registry). With this restriction we only need to store the schema (or descriptor) once when the pipeline is serialized and send to the workers. --- .../beam/sdk/testing/CoderProperties.java | 10 + .../sdk/extensions/protobuf/ProtoCoder.java | 90 ++++++- .../sdk/extensions/protobuf/ProtoDomain.java | 246 ++++++++++++++++++ .../protobuf/IsDynamicMessageEqual.java | 69 +++++ .../extensions/protobuf/ProtoCoderTest.java | 25 ++ 5 files changed, 435 insertions(+), 5 deletions(-) create mode 100644 sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomain.java create mode 100644 sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/IsDynamicMessageEqual.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java index f10e95b9b0a7..1c43fc6e1e64 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java @@ -104,6 +104,16 @@ public static void coderDecodeEncodeEqualInContext( assertThat(decodeEncode(coder, context, value), equalTo(value)); } + /** + * Verifies that for the given {@code Coder}, {@code Coder.Context}, and value of type {@code + * T}, encoding followed by decoding yields an equal value of type {@code T}. + */ + public static void coderDecodeEncodeInContext( + Coder coder, Coder.Context context, T value, org.hamcrest.Matcher matcher) + throws Exception { + assertThat(decodeEncode(coder, context, value), matcher); + } + /** * Verifies that for the given {@code Coder>}, and value of type {@code * Collection}, encoding followed by decoding yields an equal value of type {@code diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java index e2a919afb10d..faca3881f1fc 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java @@ -19,11 +19,15 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; import com.google.protobuf.ExtensionRegistry; import com.google.protobuf.Message; import com.google.protobuf.Parser; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -107,6 +111,8 @@ */ public class ProtoCoder extends CustomCoder { + public static final long serialVersionUID = -5043999806040629525L; + /** Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}. */ public static ProtoCoder of(Class protoMessageClass) { return new ProtoCoder<>(protoMessageClass, ImmutableSet.of()); @@ -122,6 +128,36 @@ public static ProtoCoder of(TypeDescriptor protoMessag return of(protoMessageClass); } + /** + * Returns a {@link ProtoCoder} for the Protocol Buffers {@link DynamicMessage} for the given + * {@link Descriptors.Descriptor}. + */ + public static ProtoCoder of(Descriptors.Descriptor protoMessageDescriptor) { + return new ProtoCoder<>( + ProtoDomain.buildFrom(protoMessageDescriptor), + protoMessageDescriptor.getFullName(), + ImmutableSet.of()); + } + + /** + * Returns a {@link ProtoCoder} for the Protocol Buffers {@link DynamicMessage} for the given + * {@link Descriptors.Descriptor}. The message descriptor should be part of the provided {@link + * ProtoDomain}, this will ensure object equality within messages from the same domain. + */ + public static ProtoCoder of( + ProtoDomain domain, Descriptors.Descriptor protoMessageDescriptor) { + return new ProtoCoder<>(domain, protoMessageDescriptor.getFullName(), ImmutableSet.of()); + } + + /** + * Returns a {@link ProtoCoder} for the Protocol Buffers {@link DynamicMessage} for the given + * message name in a {@link ProtoDomain}. The message descriptor should be part of the provided * + * {@link ProtoDomain}, this will ensure object equality within messages from the same domain. + */ + public static ProtoCoder of(ProtoDomain domain, String messageName) { + return new ProtoCoder<>(domain, messageName, ImmutableSet.of()); + } + /** * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes * registered. @@ -269,21 +305,65 @@ public ExtensionRegistry getExtensionRegistry() { private transient ExtensionRegistry memoizedExtensionRegistry; private transient Parser memoizedParser; + // Descriptor used by DynamicMessage. + private transient ProtoDomain domain; + private transient String messageName; + /** Private constructor. */ private ProtoCoder(Class protoMessageClass, Set> extensionHostClasses) { this.protoMessageClass = protoMessageClass; this.extensionHostClasses = extensionHostClasses; + this.domain = null; + this.messageName = null; + } + + private ProtoCoder(ProtoDomain domain, String messageName, Set> extensionHostClasses) { + @SuppressWarnings("unchecked") + Class protoMessageClass = (Class) DynamicMessage.class; + this.protoMessageClass = protoMessageClass; + this.extensionHostClasses = extensionHostClasses; + this.domain = domain; + this.messageName = messageName; + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.defaultWriteObject(); + if (DynamicMessage.class.equals(this.protoMessageClass)) { + if (this.domain == null) { + throw new RuntimeException("DynamicMessages require provider a proto domain to the coder."); + } + oos.writeObject(domain); + oos.writeObject(messageName); + } + } + + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + ois.defaultReadObject(); + if (DynamicMessage.class.equals(this.protoMessageClass)) { + this.domain = (ProtoDomain) ois.readObject(); + this.messageName = (String) ois.readObject(); + } } /** Get the memoized {@link Parser}, possibly initializing it lazily. */ private Parser getParser() { if (memoizedParser == null) { try { - @SuppressWarnings("unchecked") - T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null); - @SuppressWarnings("unchecked") - Parser tParser = (Parser) protoMessageInstance.getParserForType(); - memoizedParser = tParser; + if (DynamicMessage.class.equals(protoMessageClass)) { + @SuppressWarnings("unchecked") + T protoMessageInstance = + (T) DynamicMessage.newBuilder(domain.getDescriptor(messageName)).build(); + @SuppressWarnings("unchecked") + Parser tParser = (Parser) protoMessageInstance.getParserForType(); + memoizedParser = tParser; + } else { + @SuppressWarnings("unchecked") + T protoMessageInstance = + (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null); + @SuppressWarnings("unchecked") + Parser tParser = (Parser) protoMessageInstance.getParserForType(); + memoizedParser = tParser; + } } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new IllegalArgumentException(e); } diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomain.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomain.java new file mode 100644 index 000000000000..d1aca66cd206 --- /dev/null +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomain.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * ProtoDomain is a container class for Protobuf descriptors. By using a domain for all descriptors + * that are related to each other the FileDescriptorSet needs to be serialized only once in the + * graph. + * + *

Using a domain also grantees that all Descriptors have object equality, just like statically + * compiled Proto classes Descriptors. A lot of Java code isn't used to the new DynamicMessages an + * assume always Object equality. Because of this the domain class is immutable. + * + *

ProtoDomains aren't assumed to be used on with normal Message descriptors, only with + * DynamicMessage descriptors. + */ +public final class ProtoDomain implements Serializable { + public static final long serialVersionUID = 1L; + private transient DescriptorProtos.FileDescriptorSet fileDescriptorSet; + private transient int hashCode; + + private transient Map fileDescriptorMap; + private transient Map descriptorMap; + + private transient Map fileOptionMap; + private transient Map messageOptionMap; + private transient Map fieldOptionMap; + + ProtoDomain() { + this(DescriptorProtos.FileDescriptorSet.newBuilder().build()); + } + + private ProtoDomain(DescriptorProtos.FileDescriptorSet fileDescriptorSet) { + this.fileDescriptorSet = fileDescriptorSet; + hashCode = java.util.Arrays.hashCode(this.fileDescriptorSet.toByteArray()); + crosswire(); + } + + private static Map extractProtoMap( + DescriptorProtos.FileDescriptorSet fileDescriptorSet) { + HashMap map = new HashMap<>(); + fileDescriptorSet.getFileList().forEach(fdp -> map.put(fdp.getName(), fdp)); + return map; + } + + private static Descriptors.FileDescriptor convertToFileDescriptorMap( + String name, + Map inMap, + Map outMap) { + if (outMap.containsKey(name)) { + return outMap.get(name); + } + DescriptorProtos.FileDescriptorProto fileDescriptorProto = inMap.get(name); + if (fileDescriptorProto == null) { + if ("google/protobuf/descriptor.proto".equals(name)) { + outMap.put( + "google/protobuf/descriptor.proto", + DescriptorProtos.FieldOptions.getDescriptor().getFile()); + return DescriptorProtos.FieldOptions.getDescriptor().getFile(); + } + return null; + } else { + List dependencies = new ArrayList<>(); + if (fileDescriptorProto.getDependencyCount() > 0) { + fileDescriptorProto + .getDependencyList() + .forEach( + dependencyName -> { + Descriptors.FileDescriptor fileDescriptor = + convertToFileDescriptorMap(dependencyName, inMap, outMap); + if (fileDescriptor != null) { + dependencies.add(fileDescriptor); + } + }); + } + try { + Descriptors.FileDescriptor fileDescriptor = + Descriptors.FileDescriptor.buildFrom( + fileDescriptorProto, dependencies.toArray(new Descriptors.FileDescriptor[0])); + outMap.put(name, fileDescriptor); + return fileDescriptor; + } catch (Descriptors.DescriptorValidationException e) { + throw new RuntimeException(e); + } + } + } + + private static void visitFileDescriptorTree(Map map, Descriptors.FileDescriptor fileDescriptor) { + if (!map.containsKey(fileDescriptor.getName())) { + map.put(fileDescriptor.getName(), fileDescriptor); + List dependencies = fileDescriptor.getDependencies(); + dependencies.forEach(fd -> visitFileDescriptorTree(map, fd)); + } + } + + public static ProtoDomain buildFrom(Descriptors.Descriptor descriptor) { + return buildFrom(descriptor.getFile()); + } + + public static ProtoDomain buildFrom(DescriptorProtos.FileDescriptorSet fileDescriptorSet) { + return new ProtoDomain(fileDescriptorSet); + } + + public static ProtoDomain buildFrom(Descriptors.FileDescriptor fileDescriptor) { + HashMap fileDescriptorMap = new HashMap<>(); + visitFileDescriptorTree(fileDescriptorMap, fileDescriptor); + DescriptorProtos.FileDescriptorSet.Builder builder = + DescriptorProtos.FileDescriptorSet.newBuilder(); + fileDescriptorMap.values().forEach(fd -> builder.addFile(fd.toProto())); + return new ProtoDomain(builder.build()); + } + + public static ProtoDomain buildFrom(InputStream inputStream) throws IOException { + return buildFrom(DescriptorProtos.FileDescriptorSet.parseFrom(inputStream)); + } + + private void crosswire() { + HashMap map = new HashMap<>(); + fileDescriptorSet.getFileList().forEach(fdp -> map.put(fdp.getName(), fdp)); + + Map outMap = new HashMap<>(); + map.forEach((fileName, proto) -> convertToFileDescriptorMap(fileName, map, outMap)); + fileDescriptorMap = outMap; + + indexOptionsByNumber(fileDescriptorMap.values()); + indexDescriptorByName(); + } + + private void indexDescriptorByName() { + descriptorMap = new HashMap<>(); + fileDescriptorMap + .values() + .forEach( + fileDescriptor -> { + fileDescriptor + .getMessageTypes() + .forEach( + descriptor -> { + descriptorMap.put(descriptor.getFullName(), descriptor); + }); + }); + } + + private void indexOptionsByNumber(Collection fileDescriptors) { + fieldOptionMap = new HashMap<>(); + fileOptionMap = new HashMap<>(); + messageOptionMap = new HashMap<>(); + fileDescriptors.forEach( + (fileDescriptor) -> { + fileDescriptor + .getExtensions() + .forEach( + extension -> { + switch (extension.toProto().getExtendee()) { + case ".google.protobuf.FileOptions": + fileOptionMap.put(extension.getNumber(), extension); + break; + case ".google.protobuf.MessageOptions": + messageOptionMap.put(extension.getNumber(), extension); + break; + case ".google.protobuf.FieldOptions": + fieldOptionMap.put(extension.getNumber(), extension); + break; + default: + break; + } + }); + }); + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + byte[] buffer = fileDescriptorSet.toByteArray(); + oos.writeInt(buffer.length); + oos.write(buffer); + } + + private void readObject(ObjectInputStream ois) throws IOException { + byte[] buffer = new byte[ois.readInt()]; + ois.readFully(buffer); + fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(buffer); + hashCode = java.util.Arrays.hashCode(buffer); + crosswire(); + } + + public Descriptors.FileDescriptor getFileDescriptor(String name) { + return fileDescriptorMap.get(name); + } + + public Descriptors.Descriptor getDescriptor(String fullName) { + return descriptorMap.get(fullName); + } + + public Descriptors.FieldDescriptor getFieldOptionById(int id) { + return fieldOptionMap.get(id); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ProtoDomain that = (ProtoDomain) o; + return hashCode == that.hashCode; + } + + @Override + public int hashCode() { + return Objects.hash(hashCode); + } + + public boolean contains(Descriptors.Descriptor descriptor) { + return getDescriptor(descriptor.getFullName()) != null; + } +} diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/IsDynamicMessageEqual.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/IsDynamicMessageEqual.java new file mode 100644 index 000000000000..7b079631576f --- /dev/null +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/IsDynamicMessageEqual.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +/** + * Is the DynamicMessage equal to another message. This special matcher exist because the + * DynamicMessage is protobuf does a object equality in it's equals operator. + * + *

Follow https://github.com/protocolbuffers/protobuf/issues/6100 for tracking the issue. If this + * is resolved we can remove this code. + */ +public class IsDynamicMessageEqual extends BaseMatcher { + private final DynamicMessage expectedValue; + + public IsDynamicMessageEqual(DynamicMessage equalArg) { + expectedValue = equalArg; + } + + public static IsDynamicMessageEqual equalTo(DynamicMessage operand) { + return new IsDynamicMessageEqual(operand); + } + + @Override + public boolean matches(Object actualValue) { + + if (actualValue == null) { + return expectedValue == null; + } + + if (!(actualValue instanceof Message)) { + return false; + } + final Message actualMessage = (Message) actualValue; + + if (!actualMessage.toByteString().equals(expectedValue.toByteString())) { + return false; + } + + return actualMessage + .getDescriptorForType() + .getFullName() + .equals(expectedValue.getDescriptorForType().getFullName()); + } + + @Override + public void describeTo(Description description) { + description.appendValue(expectedValue); + } +} diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java index 04ed9a67dd1a..25c62517e5dc 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java @@ -17,9 +17,12 @@ */ package org.apache.beam.sdk.extensions.protobuf; +import static org.apache.beam.sdk.testing.CoderProperties.ALL_CONTEXTS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import com.google.protobuf.DynamicMessage; +import java.io.ObjectStreamClass; import java.util.Collections; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; @@ -167,4 +170,26 @@ public void testNonDeterministicProperty() throws CoderException { Coder coder = ProtoCoder.of(MessageWithMap.class); assertNotEquals(CoderUtils.encodeToBase64(coder, msg2), CoderUtils.encodeToBase64(coder, msg1)); } + + @Test + public void testDynamicMessage() throws Exception { + DynamicMessage message = + DynamicMessage.newBuilder(MessageA.getDescriptor()) + .setField( + MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD1_FIELD_NUMBER), "foo") + .build(); + Coder coder = ProtoCoder.of(message.getDescriptorForType()); + + // Special code to check the DynamicMessage equality (@see IsDynamicMessageEqual) + for (Coder.Context context : ALL_CONTEXTS) { + CoderProperties.coderDecodeEncodeInContext( + coder, context, message, IsDynamicMessageEqual.equalTo(message)); + } + } + + @Test + public void testSerialVersionID() { + long serialVersionID = ObjectStreamClass.lookup(ProtoCoder.class).getSerialVersionUID(); + assertEquals(-5043999806040629525L, serialVersionID); + } } From ba4a121254c47fcebdd8810aad139e881a250e31 Mon Sep 17 00:00:00 2001 From: Alex Van Boxel Date: Mon, 14 Oct 2019 22:27:38 +0200 Subject: [PATCH 2/4] [BEAM-5967] Add Nullable annotations and corrected JavaDoc after review --- .../main/java/org/apache/beam/sdk/testing/CoderProperties.java | 3 ++- .../org/apache/beam/sdk/extensions/protobuf/ProtoDomain.java | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java index 1c43fc6e1e64..e89cacd87906 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java @@ -106,7 +106,8 @@ public static void coderDecodeEncodeEqualInContext( /** * Verifies that for the given {@code Coder}, {@code Coder.Context}, and value of type {@code - * T}, encoding followed by decoding yields an equal value of type {@code T}. + * T}, encoding followed by decoding yields a value of type {@code T} and tests that the matcher + * succeeds on the values. */ public static void coderDecodeEncodeInContext( Coder coder, Coder.Context context, T value, org.hamcrest.Matcher matcher) diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomain.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomain.java index d1aca66cd206..e9a5d48ed35b 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomain.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomain.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import javax.annotation.Nullable; /** * ProtoDomain is a container class for Protobuf descriptors. By using a domain for all descriptors @@ -72,6 +73,7 @@ private static Map extractProtoMap return map; } + @Nullable private static Descriptors.FileDescriptor convertToFileDescriptorMap( String name, Map inMap, From cb774734f24ac143b4cbde1ae6d0d925c8b35499 Mon Sep 17 00:00:00 2001 From: Alex Van Boxel Date: Fri, 18 Oct 2019 00:16:35 +0200 Subject: [PATCH 3/4] Backported ProtoDomainTest for schema aware PR --- .../extensions/protobuf/ProtoCoderTest.java | 30 ++++++++++ .../extensions/protobuf/ProtoDomainTest.java | 55 +++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomainTest.java diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java index 25c62517e5dc..0a493931c9ea 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java @@ -187,6 +187,36 @@ public void testDynamicMessage() throws Exception { } } + @Test + public void testDynamicNestedRepeatedMessage() throws Exception { + DynamicMessage message = + DynamicMessage.newBuilder(MessageA.getDescriptor()) + .setField( + MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD1_FIELD_NUMBER), "foo") + .addRepeatedField( + MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD2_FIELD_NUMBER), + DynamicMessage.newBuilder(MessageB.getDescriptor()) + .setField( + MessageB.getDescriptor().findFieldByNumber(MessageB.FIELD1_FIELD_NUMBER), + true) + .build()) + .addRepeatedField( + MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD2_FIELD_NUMBER), + DynamicMessage.newBuilder(MessageB.getDescriptor()) + .setField( + MessageB.getDescriptor().findFieldByNumber(MessageB.FIELD1_FIELD_NUMBER), + false) + .build()) + .build(); + Coder coder = ProtoCoder.of(message.getDescriptorForType()); + + // Special code to check the DynamicMessage equality (@see IsDynamicMessageEqual) + for (Coder.Context context : ALL_CONTEXTS) { + CoderProperties.coderDecodeEncodeInContext( + coder, context, message, IsDynamicMessageEqual.equalTo(message)); + } + } + @Test public void testSerialVersionID() { long serialVersionID = ObjectStreamClass.lookup(ProtoCoder.class).getSerialVersionUID(); diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomainTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomainTest.java new file mode 100644 index 000000000000..5ff909bdf7ab --- /dev/null +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomainTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.Int32Value; +import com.google.protobuf.Int64Value; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ProtoDomain}. */ +@RunWith(JUnit4.class) +public class ProtoDomainTest { + + @Test + public void testNamespaceEqual() { + ProtoDomain domainFromInt32 = ProtoDomain.buildFrom(Int32Value.getDescriptor()); + ProtoDomain domainFromInt64 = ProtoDomain.buildFrom(Int64Value.getDescriptor()); + Assert.assertTrue(domainFromInt64.equals(domainFromInt32)); + } + + @Test + public void testContainsDescriptor() { + ProtoDomain domainFromInt32 = ProtoDomain.buildFrom(Int32Value.getDescriptor()); + Assert.assertTrue(domainFromInt32.contains(Int32Value.getDescriptor())); + } + + @Test + public void testContainsOtherDescriptorSameFile() { + ProtoDomain domain = ProtoDomain.buildFrom(Int32Value.getDescriptor()); + Assert.assertTrue(domain.contains(Int64Value.getDescriptor())); + } + + @Test + public void testBuildForFile() { + ProtoDomain domain = ProtoDomain.buildFrom(Int32Value.getDescriptor().getFile()); + Assert.assertNotNull(domain.getFileDescriptor("google/protobuf/wrappers.proto")); + } +} From 64829e18a40bab6f76db4aef3a06d2a32254533e Mon Sep 17 00:00:00 2001 From: Alex Van Boxel Date: Mon, 21 Oct 2019 22:48:15 +0200 Subject: [PATCH 4/4] Split the ProtoCoder into ProtoCoder and DynamicProtoCoder. --- .../protobuf/DynamicProtoCoder.java | 205 ++++++++++++++++++ .../sdk/extensions/protobuf/ProtoCoder.java | 119 ++-------- .../protobuf/DynamicProtoCoderTest.java | 92 ++++++++ .../extensions/protobuf/ProtoCoderTest.java | 48 ---- 4 files changed, 320 insertions(+), 144 deletions(-) create mode 100644 sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoder.java create mode 100644 sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoderTest.java diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoder.java new file mode 100644 index 000000000000..96ca0faba7b4 --- /dev/null +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoder.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; + +/** + * A {@link Coder} using Google Protocol Buffers binary format. {@link DynamicProtoCoder} supports + * both Protocol Buffers syntax versions 2 and 3. + * + *

To learn more about Protocol Buffers, visit: https://developers.google.com/protocol-buffers + * + *

{@link DynamicProtoCoder} is not registered in the global {@link CoderRegistry} as the + * descriptor is required to create the coder. + */ +public class DynamicProtoCoder extends ProtoCoder { + + public static final long serialVersionUID = 1L; + + /** + * Returns a {@link DynamicProtoCoder} for the Protocol Buffers {@link DynamicMessage} for the + * given {@link Descriptors.Descriptor}. + */ + public static DynamicProtoCoder of(Descriptors.Descriptor protoMessageDescriptor) { + return new DynamicProtoCoder( + ProtoDomain.buildFrom(protoMessageDescriptor), + protoMessageDescriptor.getFullName(), + ImmutableSet.of()); + } + + /** + * Returns a {@link DynamicProtoCoder} for the Protocol Buffers {@link DynamicMessage} for the + * given {@link Descriptors.Descriptor}. The message descriptor should be part of the provided + * {@link ProtoDomain}, this will ensure object equality within messages from the same domain. + */ + public static DynamicProtoCoder of( + ProtoDomain domain, Descriptors.Descriptor protoMessageDescriptor) { + return new DynamicProtoCoder(domain, protoMessageDescriptor.getFullName(), ImmutableSet.of()); + } + + /** + * Returns a {@link DynamicProtoCoder} for the Protocol Buffers {@link DynamicMessage} for the + * given message name in a {@link ProtoDomain}. The message descriptor should be part of the + * provided * {@link ProtoDomain}, this will ensure object equality within messages from the same + * domain. + */ + public static DynamicProtoCoder of(ProtoDomain domain, String messageName) { + return new DynamicProtoCoder(domain, messageName, ImmutableSet.of()); + } + + /** + * Returns a {@link DynamicProtoCoder} like this one, but with the extensions from the given + * classes registered. + * + *

Each of the extension host classes must be an class automatically generated by the Protocol + * Buffers compiler, {@code protoc}, that contains messages. + * + *

Does not modify this object. + */ + @Override + public DynamicProtoCoder withExtensionsFrom(Iterable> moreExtensionHosts) { + validateExtensions(moreExtensionHosts); + return new DynamicProtoCoder( + this.domain, + this.messageName, + new ImmutableSet.Builder>() + .addAll(extensionHostClasses) + .addAll(moreExtensionHosts) + .build()); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + DynamicProtoCoder otherCoder = (DynamicProtoCoder) other; + return protoMessageClass.equals(otherCoder.protoMessageClass) + && Sets.newHashSet(extensionHostClasses) + .equals(Sets.newHashSet(otherCoder.extensionHostClasses)) + && domain.equals(otherCoder.domain) + && messageName.equals(otherCoder.messageName); + } + + @Override + public int hashCode() { + return Objects.hash(protoMessageClass, extensionHostClasses, domain, messageName); + } + + //////////////////////////////////////////////////////////////////////////////////// + // Private implementation details below. + + // Constants used to serialize and deserialize + private static final String PROTO_MESSAGE_CLASS = "dynamic_proto_message_class"; + private static final String PROTO_EXTENSION_HOSTS = "dynamic_proto_extension_hosts"; + + // Descriptor used by DynamicMessage. + private transient ProtoDomain domain; + private transient String messageName; + + private DynamicProtoCoder( + ProtoDomain domain, String messageName, Set> extensionHostClasses) { + super(DynamicMessage.class, extensionHostClasses); + this.domain = domain; + this.messageName = messageName; + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.defaultWriteObject(); + oos.writeObject(domain); + oos.writeObject(messageName); + } + + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + ois.defaultReadObject(); + this.domain = (ProtoDomain) ois.readObject(); + this.messageName = (String) ois.readObject(); + } + + /** Get the memoized {@link Parser}, possibly initializing it lazily. */ + @Override + protected Parser getParser() { + if (memoizedParser == null) { + DynamicMessage protoMessageInstance = + DynamicMessage.newBuilder(domain.getDescriptor(messageName)).build(); + memoizedParser = protoMessageInstance.getParserForType(); + } + return memoizedParser; + } + + /** + * Returns a {@link CoderProvider} which uses the {@link DynamicProtoCoder} for {@link Message + * proto messages}. + * + *

This method is invoked reflectively from {@link DefaultCoder}. + */ + public static CoderProvider getCoderProvider() { + return new ProtoCoderProvider(); + } + + static final TypeDescriptor MESSAGE_TYPE = new TypeDescriptor() {}; + + /** A {@link CoderProvider} for {@link Message proto messages}. */ + private static class ProtoCoderProvider extends CoderProvider { + + @Override + public Coder coderFor( + TypeDescriptor typeDescriptor, List> componentCoders) + throws CannotProvideCoderException { + if (!typeDescriptor.isSubtypeOf(MESSAGE_TYPE)) { + throw new CannotProvideCoderException( + String.format( + "Cannot provide %s because %s is not a subclass of %s", + DynamicProtoCoder.class.getSimpleName(), typeDescriptor, Message.class.getName())); + } + + @SuppressWarnings("unchecked") + TypeDescriptor messageType = + (TypeDescriptor) typeDescriptor; + try { + @SuppressWarnings("unchecked") + Coder coder = (Coder) DynamicProtoCoder.of(messageType); + return coder; + } catch (IllegalArgumentException e) { + throw new CannotProvideCoderException(e); + } + } + } +} diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java index faca3881f1fc..0b2d717da288 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java @@ -19,15 +19,12 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; -import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import com.google.protobuf.ExtensionRegistry; import com.google.protobuf.Message; import com.google.protobuf.Parser; import java.io.IOException; import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -36,8 +33,6 @@ import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -129,45 +124,11 @@ public static ProtoCoder of(TypeDescriptor protoMessag } /** - * Returns a {@link ProtoCoder} for the Protocol Buffers {@link DynamicMessage} for the given - * {@link Descriptors.Descriptor}. - */ - public static ProtoCoder of(Descriptors.Descriptor protoMessageDescriptor) { - return new ProtoCoder<>( - ProtoDomain.buildFrom(protoMessageDescriptor), - protoMessageDescriptor.getFullName(), - ImmutableSet.of()); - } - - /** - * Returns a {@link ProtoCoder} for the Protocol Buffers {@link DynamicMessage} for the given - * {@link Descriptors.Descriptor}. The message descriptor should be part of the provided {@link - * ProtoDomain}, this will ensure object equality within messages from the same domain. - */ - public static ProtoCoder of( - ProtoDomain domain, Descriptors.Descriptor protoMessageDescriptor) { - return new ProtoCoder<>(domain, protoMessageDescriptor.getFullName(), ImmutableSet.of()); - } - - /** - * Returns a {@link ProtoCoder} for the Protocol Buffers {@link DynamicMessage} for the given - * message name in a {@link ProtoDomain}. The message descriptor should be part of the provided * - * {@link ProtoDomain}, this will ensure object equality within messages from the same domain. - */ - public static ProtoCoder of(ProtoDomain domain, String messageName) { - return new ProtoCoder<>(domain, messageName, ImmutableSet.of()); - } - - /** - * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes - * registered. + * Validate that all extensionHosts are able to be registered. * - *

Each of the extension host classes must be an class automatically generated by the Protocol - * Buffers compiler, {@code protoc}, that contains messages. - * - *

Does not modify this object. + * @param moreExtensionHosts */ - public ProtoCoder withExtensionsFrom(Iterable> moreExtensionHosts) { + void validateExtensions(Iterable> moreExtensionHosts) { for (Class extensionHost : moreExtensionHosts) { // Attempt to access the required method, to make sure it's present. try { @@ -182,7 +143,19 @@ public ProtoCoder withExtensionsFrom(Iterable> moreExtensionHosts) { e); } } + } + /** + * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes + * registered. + * + *

Each of the extension host classes must be an class automatically generated by the Protocol + * Buffers compiler, {@code protoc}, that contains messages. + * + *

Does not modify this object. + */ + public ProtoCoder withExtensionsFrom(Iterable> moreExtensionHosts) { + validateExtensions(moreExtensionHosts); return new ProtoCoder<>( protoMessageClass, new ImmutableSet.Builder>() @@ -236,7 +209,7 @@ public boolean equals(Object other) { if (this == other) { return true; } - if (!(other instanceof ProtoCoder)) { + if (other == null || getClass() != other.getClass()) { return false; } ProtoCoder otherCoder = (ProtoCoder) other; @@ -289,13 +262,13 @@ public ExtensionRegistry getExtensionRegistry() { // Private implementation details below. /** The {@link Message} type to be coded. */ - private final Class protoMessageClass; + final Class protoMessageClass; /** * All extension host classes included in this {@link ProtoCoder}. The extensions from these * classes will be included in the {@link ExtensionRegistry} used during encoding and decoding. */ - private final Set> extensionHostClasses; + final Set> extensionHostClasses; // Constants used to serialize and deserialize private static final String PROTO_MESSAGE_CLASS = "proto_message_class"; @@ -303,59 +276,21 @@ public ExtensionRegistry getExtensionRegistry() { // Transient fields that are lazy initialized and then memoized. private transient ExtensionRegistry memoizedExtensionRegistry; - private transient Parser memoizedParser; - - // Descriptor used by DynamicMessage. - private transient ProtoDomain domain; - private transient String messageName; + transient Parser memoizedParser; /** Private constructor. */ - private ProtoCoder(Class protoMessageClass, Set> extensionHostClasses) { + protected ProtoCoder(Class protoMessageClass, Set> extensionHostClasses) { this.protoMessageClass = protoMessageClass; this.extensionHostClasses = extensionHostClasses; - this.domain = null; - this.messageName = null; - } - - private ProtoCoder(ProtoDomain domain, String messageName, Set> extensionHostClasses) { - @SuppressWarnings("unchecked") - Class protoMessageClass = (Class) DynamicMessage.class; - this.protoMessageClass = protoMessageClass; - this.extensionHostClasses = extensionHostClasses; - this.domain = domain; - this.messageName = messageName; - } - - private void writeObject(ObjectOutputStream oos) throws IOException { - oos.defaultWriteObject(); - if (DynamicMessage.class.equals(this.protoMessageClass)) { - if (this.domain == null) { - throw new RuntimeException("DynamicMessages require provider a proto domain to the coder."); - } - oos.writeObject(domain); - oos.writeObject(messageName); - } - } - - private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { - ois.defaultReadObject(); - if (DynamicMessage.class.equals(this.protoMessageClass)) { - this.domain = (ProtoDomain) ois.readObject(); - this.messageName = (String) ois.readObject(); - } } /** Get the memoized {@link Parser}, possibly initializing it lazily. */ - private Parser getParser() { + protected Parser getParser() { if (memoizedParser == null) { try { if (DynamicMessage.class.equals(protoMessageClass)) { - @SuppressWarnings("unchecked") - T protoMessageInstance = - (T) DynamicMessage.newBuilder(domain.getDescriptor(messageName)).build(); - @SuppressWarnings("unchecked") - Parser tParser = (Parser) protoMessageInstance.getParserForType(); - memoizedParser = tParser; + throw new IllegalArgumentException( + "DynamicMessage is not supported by the ProtoCoder, use the DynamicProtoCoder."); } else { @SuppressWarnings("unchecked") T protoMessageInstance = @@ -409,12 +344,4 @@ public Coder coderFor( } } } - - private SortedSet getSortedExtensionClasses() { - SortedSet ret = new TreeSet<>(); - for (Class clazz : extensionHostClasses) { - ret.add(clazz.getName()); - } - return ret; - } } diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoderTest.java new file mode 100644 index 000000000000..10395832b36a --- /dev/null +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoderTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import static org.apache.beam.sdk.testing.CoderProperties.ALL_CONTEXTS; +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.DynamicMessage; +import java.io.ObjectStreamClass; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.protobuf.Proto2CoderTestMessages.MessageA; +import org.apache.beam.sdk.extensions.protobuf.Proto2CoderTestMessages.MessageB; +import org.apache.beam.sdk.testing.CoderProperties; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ProtoCoder}. */ +@RunWith(JUnit4.class) +public class DynamicProtoCoderTest { + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testDynamicMessage() throws Exception { + DynamicMessage message = + DynamicMessage.newBuilder(MessageA.getDescriptor()) + .setField( + MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD1_FIELD_NUMBER), "foo") + .build(); + Coder coder = DynamicProtoCoder.of(message.getDescriptorForType()); + + // Special code to check the DynamicMessage equality (@see IsDynamicMessageEqual) + for (Coder.Context context : ALL_CONTEXTS) { + CoderProperties.coderDecodeEncodeInContext( + coder, context, message, IsDynamicMessageEqual.equalTo(message)); + } + } + + @Test + public void testDynamicNestedRepeatedMessage() throws Exception { + DynamicMessage message = + DynamicMessage.newBuilder(MessageA.getDescriptor()) + .setField( + MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD1_FIELD_NUMBER), "foo") + .addRepeatedField( + MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD2_FIELD_NUMBER), + DynamicMessage.newBuilder(MessageB.getDescriptor()) + .setField( + MessageB.getDescriptor().findFieldByNumber(MessageB.FIELD1_FIELD_NUMBER), + true) + .build()) + .addRepeatedField( + MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD2_FIELD_NUMBER), + DynamicMessage.newBuilder(MessageB.getDescriptor()) + .setField( + MessageB.getDescriptor().findFieldByNumber(MessageB.FIELD1_FIELD_NUMBER), + false) + .build()) + .build(); + Coder coder = DynamicProtoCoder.of(message.getDescriptorForType()); + + // Special code to check the DynamicMessage equality (@see IsDynamicMessageEqual) + for (Coder.Context context : ALL_CONTEXTS) { + CoderProperties.coderDecodeEncodeInContext( + coder, context, message, IsDynamicMessageEqual.equalTo(message)); + } + } + + @Test + public void testSerialVersionID() { + long serialVersionID = ObjectStreamClass.lookup(DynamicProtoCoder.class).getSerialVersionUID(); + assertEquals(1L, serialVersionID); + } +} diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java index 0a493931c9ea..38aa92bfc223 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java @@ -17,11 +17,9 @@ */ package org.apache.beam.sdk.extensions.protobuf; -import static org.apache.beam.sdk.testing.CoderProperties.ALL_CONTEXTS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -import com.google.protobuf.DynamicMessage; import java.io.ObjectStreamClass; import java.util.Collections; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -171,52 +169,6 @@ public void testNonDeterministicProperty() throws CoderException { assertNotEquals(CoderUtils.encodeToBase64(coder, msg2), CoderUtils.encodeToBase64(coder, msg1)); } - @Test - public void testDynamicMessage() throws Exception { - DynamicMessage message = - DynamicMessage.newBuilder(MessageA.getDescriptor()) - .setField( - MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD1_FIELD_NUMBER), "foo") - .build(); - Coder coder = ProtoCoder.of(message.getDescriptorForType()); - - // Special code to check the DynamicMessage equality (@see IsDynamicMessageEqual) - for (Coder.Context context : ALL_CONTEXTS) { - CoderProperties.coderDecodeEncodeInContext( - coder, context, message, IsDynamicMessageEqual.equalTo(message)); - } - } - - @Test - public void testDynamicNestedRepeatedMessage() throws Exception { - DynamicMessage message = - DynamicMessage.newBuilder(MessageA.getDescriptor()) - .setField( - MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD1_FIELD_NUMBER), "foo") - .addRepeatedField( - MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD2_FIELD_NUMBER), - DynamicMessage.newBuilder(MessageB.getDescriptor()) - .setField( - MessageB.getDescriptor().findFieldByNumber(MessageB.FIELD1_FIELD_NUMBER), - true) - .build()) - .addRepeatedField( - MessageA.getDescriptor().findFieldByNumber(MessageA.FIELD2_FIELD_NUMBER), - DynamicMessage.newBuilder(MessageB.getDescriptor()) - .setField( - MessageB.getDescriptor().findFieldByNumber(MessageB.FIELD1_FIELD_NUMBER), - false) - .build()) - .build(); - Coder coder = ProtoCoder.of(message.getDescriptorForType()); - - // Special code to check the DynamicMessage equality (@see IsDynamicMessageEqual) - for (Coder.Context context : ALL_CONTEXTS) { - CoderProperties.coderDecodeEncodeInContext( - coder, context, message, IsDynamicMessageEqual.equalTo(message)); - } - } - @Test public void testSerialVersionID() { long serialVersionID = ObjectStreamClass.lookup(ProtoCoder.class).getSerialVersionUID();