diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java index 41e28870783e..b2aa0d50bb4b 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue; import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.AvroCoder; import com.google.cloud.dataflow.sdk.io.Source.Reader; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; @@ -36,6 +37,7 @@ import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.collect.ImmutableList; +import org.apache.avro.reflect.ReflectData; import org.hamcrest.Matchers; import org.junit.Ignore; import org.junit.Rule; @@ -55,6 +57,7 @@ import java.util.List; import java.util.Random; +import javax.annotation.Nullable; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlRootElement; @@ -156,12 +159,12 @@ public class XmlSourceTest { @XmlRootElement static class Train { public static final int TRAIN_NUMBER_UNDEFINED = -1; - public String name = null; - public String color = null; + @Nullable public String name = null; + @Nullable public String color = null; public int number = TRAIN_NUMBER_UNDEFINED; @XmlAttribute(name = "size") - public String size = null; + @Nullable public String size = null; public Train() {} @@ -576,7 +579,11 @@ public void testReadXMLSmallDataflow() throws IOException { .withRecordClass(Train.class) .withMinBundleSize(1024); - PCollection output = p.apply(Read.from(source).named("ReadFileData")); + // The default JAXB Coder is not suitable as part of a composite coder. + AvroCoder intermediateCoder = + AvroCoder.of(Train.class, ReflectData.AllowNull.get().getSchema(Train.class)); + PCollection output = + p.apply(Read.from(source).named("ReadFileData")).setCoder(intermediateCoder); List expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), @@ -665,7 +672,9 @@ public void testReadXMLLargeDataflow() throws IOException { .withRecordElement("train") .withRecordClass(Train.class) .withMinBundleSize(1024); - PCollection output = p.apply(Read.from(source).named("ReadFileData")); + AvroCoder intermediateCoder = AvroCoder.of(Train.class); + PCollection output = + p.apply(Read.from(source).named("ReadFileData")).setCoder(intermediateCoder); DataflowAssert.that(output).containsInAnyOrder(trains); p.run(); @@ -812,7 +821,9 @@ public void testReadXMLFilePattern() throws IOException { .withRecordElement("train") .withRecordClass(Train.class) .withMinBundleSize(1024); - PCollection output = p.apply(Read.from(source).named("ReadFileData")); + AvroCoder intermediateCoder = AvroCoder.of(Train.class); + PCollection output = + p.apply(Read.from(source).named("ReadFileData")).setCoder(intermediateCoder); List expectedResults = new ArrayList<>(); expectedResults.addAll(trains1);