diff --git a/.test-infra/jenkins/job_PostCommit_Java_Avro_Versions.groovy b/.test-infra/jenkins/job_PostCommit_Java_Avro_Versions.groovy new file mode 100644 index 000000000000..ea07cb263928 --- /dev/null +++ b/.test-infra/jenkins/job_PostCommit_Java_Avro_Versions.groovy @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import PostcommitJobBuilder + + +// This job runs the Java tests that depends on Avro against different Avro API versions +PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_Avro_Versions', 'Run PostCommit_Java_Avro_Versions', + 'Java Avro Versions Post Commit Tests', this) { + + description('Java Avro Versions Post Commit Tests') + + // Set common parameters. + commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 240) + + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + + // Gradle goals for this job. + steps { + gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + tasks(":javaAvroVersionsTest") + commonJobProperties.setGradleSwitches(delegate) + // Specify maven home on Jenkins, needed by Maven archetype integration tests. + switches('-Pmaven_home=/home/jenkins/tools/maven/apache-maven-3.5.4') + } + } + } diff --git a/.test-infra/jenkins/job_PreCommit_Java.groovy b/.test-infra/jenkins/job_PreCommit_Java.groovy index bc94a418d2e9..e3a79709563f 100644 --- a/.test-infra/jenkins/job_PreCommit_Java.groovy +++ b/.test-infra/jenkins/job_PreCommit_Java.groovy @@ -20,6 +20,7 @@ import PrecommitJobBuilder // exclude paths with their own PreCommit tasks def excludePaths = [ + 'extensions/avro', 'extensions/sql', 'io/amazon-web-services', 'io/amazon-web-services2', diff --git a/build.gradle.kts b/build.gradle.kts index 85088a262760..995e55de4543 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -393,6 +393,10 @@ tasks.register("javaHadoopVersionsTest") { dependsOn(":runners:spark:3:hadoopVersionsTest") } +tasks.register("javaAvroVersionsTest") { + dependsOn(":sdks:java:extensions:avro:avroVersionsTest") +} + tasks.register("sqlPostCommit") { dependsOn(":sdks:java:extensions:sql:postCommit") dependsOn(":sdks:java:extensions:sql:jdbc:postCommit") diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java index bb0003c4d4dc..6bcb98cff4ab 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java @@ -283,6 +283,11 @@ public T convert(TypeDescriptor typeDescriptor) { } } + protected StackManipulation shortCircuitReturnNull( + StackManipulation readValue, StackManipulation onNotNull) { + return new ShortCircuitReturnNull(readValue, onNotNull); + } + protected abstract T convertArray(TypeDescriptor type); protected abstract T convertIterable(TypeDescriptor type); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java index 494c47ee259c..1f039b1989c8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java @@ -126,6 +126,8 @@ private static boolean fieldsEquivalent(Object expected, Object actual, FieldTyp (Map) expected, (Map) actual, fieldType.getMapValueType()); + } else if (fieldType.getTypeName() == TypeName.ROW) { + return rowsEquivalent((Row) expected, (Row) actual); } else { return Objects.equals(expected, actual); } diff --git a/sdks/java/extensions/avro/build.gradle b/sdks/java/extensions/avro/build.gradle index f73a9efccfb5..4aa765c08548 100644 --- a/sdks/java/extensions/avro/build.gradle +++ b/sdks/java/extensions/avro/build.gradle @@ -1,3 +1,5 @@ +import java.util.stream.Collectors + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -27,6 +29,14 @@ applyAvroNature() description = "Apache Beam :: SDKs :: Java :: Extensions :: Avro" +def avroVersions = [ + '192' : "1.9.2", + '1102': "1.10.2", + '1111': "1.11.1", +] + +avroVersions.each { k, v -> configurations.create("avroVersion$k") } + // Exclude tests that need a runner test { systemProperty "beamUseDummyRunner", "true" @@ -38,14 +48,14 @@ test { dependencies { implementation library.java.byte_buddy implementation library.java.vendored_guava_26_0_jre - implementation (project(path: ":sdks:java:core", configuration: "shadow")) { + implementation(project(path: ":sdks:java:core", configuration: "shadow")) { // Exclude Avro dependencies from "core" since Avro support moved to this extension exclude group: "org.apache.avro", module: "avro" } implementation library.java.error_prone_annotations implementation library.java.avro implementation library.java.joda_time - testImplementation (project(path: ":sdks:java:core", configuration: "shadowTest")) { + testImplementation(project(path: ":sdks:java:core", configuration: "shadowTest")) { // Exclude Avro dependencies from "core" since Avro support moved to this extension exclude group: "org.apache.avro", module: "avro" } @@ -53,4 +63,79 @@ dependencies { testImplementation library.java.junit testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testRuntimeOnly library.java.slf4j_jdk14 -} \ No newline at end of file + avroVersions.each { + "avroVersion$it.key" "org.apache.avro:avro:$it.value" + "avroVersion$it.key" "org.apache.avro:avro-tools:$it.value" + } +} + +avroVersions.each { kv -> + configurations."avroVersion$kv.key" { + resolutionStrategy { + force "org.apache.avro:avro:$kv.value" + } + } + + sourceSets { + "avro${kv.key}" { + java { + srcDirs "build/generated/sources/avro${kv.key}/test/java" + } + + compileClasspath = configurations."avroVersion$kv.key" + sourceSets.test.output + sourceSets.test.compileClasspath + runtimeClasspath += compileClasspath + sourceSets.test.runtimeClasspath + } + } + + "compileAvro${kv.key}Java" { + checkerFramework { + skipCheckerFramework = true + } + } + + "spotbugsAvro${kv.key}" { + ignoreFailures = true + } + + "generateAvro${kv.key}AvroJava" { + dependsOn "generateAvroClasses${kv.key}" + } + + task "avroVersion${kv.key}Test"(type: Test) { + group = "Verification" + description = "Runs Avro extension tests with Avro version $kv.value" + outputs.upToDateWhen { false } + classpath = sourceSets."avro${kv.key}".runtimeClasspath + + include '**/*.class' + exclude '**/AvroIOTest$NeedsRunnerTests$*.class' + + dependsOn "generateAvroClasses${kv.key}" + } + + task "generateAvroClasses${kv.key}"(type: JavaExec) { + group = "build" + description = "Generate Avro classes for Avro version $kv.value" + classpath = configurations."avroVersion$kv.key" + main = "org.apache.avro.tool.Main" + args = [ + "compile", + "schema", + "src/test/avro/org/apache/beam/sdk/extensions/avro/io/user.avsc", + "src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/test.avsc", + "build/generated/sources/avro${kv.key}/test/java" + ] + } +} + +task avroVersionsTest { + group = "Verification" + description = 'Runs Avro extension tests with different Avro API versions' + dependsOn createTaskNames(avroVersions, "Test") +} + +static def createTaskNames(Map prefixMap, String suffix) { + return prefixMap.keySet().stream() + .map { version -> "avroVersion${version}${suffix}" } + .collect(Collectors.toList()) +} diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java index 4687eb566424..186a9a76557b 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java @@ -256,7 +256,7 @@ private Object readResolve() throws IOException, ClassNotFoundException { * Serializable}'s usage of the {@link #writeReplace} method. Kryo doesn't utilize Java's * serialization and hence is able to encode the {@link Schema} object directly. */ - private static class SerializableSchemaSupplier implements Serializable, Supplier { + static class SerializableSchemaSupplier implements Serializable, Supplier { // writeReplace makes this object serializable. This is a limitation of FindBugs as discussed // here: // http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index fcd8c11089b8..f062326b2da4 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -34,14 +34,18 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nonnull; import net.bytebuddy.description.type.TypeDescription.ForLoadedType; +import net.bytebuddy.implementation.bytecode.Division; import net.bytebuddy.implementation.bytecode.Duplication; +import net.bytebuddy.implementation.bytecode.Multiplication; import net.bytebuddy.implementation.bytecode.StackManipulation; import net.bytebuddy.implementation.bytecode.StackManipulation.Compound; import net.bytebuddy.implementation.bytecode.TypeCreation; import net.bytebuddy.implementation.bytecode.assign.TypeCasting; +import net.bytebuddy.implementation.bytecode.constant.LongConstant; import net.bytebuddy.implementation.bytecode.member.MethodInvocation; import net.bytebuddy.matcher.ElementMatchers; import org.apache.avro.AvroRuntimeException; @@ -144,6 +148,7 @@ "rawtypes" }) public class AvroUtils { + static { // This works around a bug in the Avro library (AVRO-1891) around SpecificRecord's handling // of DateTime types. @@ -151,6 +156,14 @@ public class AvroUtils { GenericData.get().addLogicalTypeConversion(new AvroCoder.JodaTimestampConversion()); } + private static final ForLoadedType BYTES = new ForLoadedType(byte[].class); + private static final ForLoadedType JAVA_INSTANT = new ForLoadedType(java.time.Instant.class); + private static final ForLoadedType JAVA_LOCALE_DATE = + new ForLoadedType(java.time.LocalDate.class); + private static final ForLoadedType JODA_READABLE_INSTANT = + new ForLoadedType(ReadableInstant.class); + private static final ForLoadedType JODA_INSTANT = new ForLoadedType(Instant.class); + // Unwrap an AVRO schema into the base type an whether it is nullable. public static class TypeWithNullability { public final org.apache.avro.Schema type; @@ -254,7 +267,10 @@ public AvroConvertType(boolean returnRawType) { @Override protected java.lang.reflect.Type convertDefault(TypeDescriptor type) { - if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { + if (type.isSubtypeOf(TypeDescriptor.of(java.time.Instant.class)) + || type.isSubtypeOf(TypeDescriptor.of(java.time.LocalDate.class))) { + return convertDateTime(type); + } else if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { return byte[].class; } else { return super.convertDefault(type); @@ -282,10 +298,46 @@ protected StackManipulation convertDefault(TypeDescriptor type) { MethodInvocation.invoke( new ForLoadedType(GenericFixed.class) .getDeclaredMethods() - .filter( - ElementMatchers.named("bytes") - .and(ElementMatchers.returns(new ForLoadedType(byte[].class)))) + .filter(ElementMatchers.named("bytes").and(ElementMatchers.returns(BYTES))) .getOnly())); + } else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) { + // Generates the following code: + // return Instant.ofEpochMilli(value.toEpochMilli()) + StackManipulation onNotNull = + new Compound( + readValue, + MethodInvocation.invoke( + JAVA_INSTANT + .getDeclaredMethods() + .filter(ElementMatchers.named("toEpochMilli")) + .getOnly()), + MethodInvocation.invoke( + JODA_INSTANT + .getDeclaredMethods() + .filter( + ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochMilli"))) + .getOnly())); + return shortCircuitReturnNull(readValue, onNotNull); + } else if (java.time.LocalDate.class.isAssignableFrom(type.getRawType())) { + // Generates the following code: + // return Instant.ofEpochMilli(value.toEpochDay() * TimeUnit.DAYS.toMillis(1)) + StackManipulation onNotNull = + new Compound( + readValue, + MethodInvocation.invoke( + JAVA_LOCALE_DATE + .getDeclaredMethods() + .filter(ElementMatchers.named("toEpochDay")) + .getOnly()), + LongConstant.forValue(TimeUnit.DAYS.toMillis(1)), + Multiplication.LONG, + MethodInvocation.invoke( + JODA_INSTANT + .getDeclaredMethods() + .filter( + ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochMilli"))) + .getOnly())); + return shortCircuitReturnNull(readValue, onNotNull); } return super.convertDefault(type); } @@ -303,25 +355,60 @@ protected TypeConversionsFactory getFactory() { @Override protected StackManipulation convertDefault(TypeDescriptor type) { - final ForLoadedType byteArrayType = new ForLoadedType(byte[].class); if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { // Generate the following code: - // return new T((byte[]) value); + // return new T((byte[]) value); ForLoadedType loadedType = new ForLoadedType(type.getRawType()); return new Compound( TypeCreation.of(loadedType), Duplication.SINGLE, // Load the parameter and cast it to a byte[]. readValue, - TypeCasting.to(byteArrayType), + TypeCasting.to(BYTES), // Create a new instance that wraps this byte[]. MethodInvocation.invoke( loadedType .getDeclaredMethods() .filter( - ElementMatchers.isConstructor() - .and(ElementMatchers.takesArguments(byteArrayType))) + ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(BYTES))) .getOnly())); + } else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) { + // Generates the following code: + // return java.time.Instant.ofEpochMilli(value.getMillis()) + StackManipulation onNotNull = + new Compound( + readValue, + MethodInvocation.invoke( + JODA_READABLE_INSTANT + .getDeclaredMethods() + .filter(ElementMatchers.named("getMillis")) + .getOnly()), + MethodInvocation.invoke( + JAVA_INSTANT + .getDeclaredMethods() + .filter( + ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochMilli"))) + .getOnly())); + return shortCircuitReturnNull(readValue, onNotNull); + } else if (java.time.LocalDate.class.isAssignableFrom(type.getRawType())) { + // Generates the following code: + // return java.time.LocalDate.ofEpochDay(value.getMillis() / TimeUnit.DAYS.toMillis(1)) + StackManipulation onNotNull = + new Compound( + readValue, + MethodInvocation.invoke( + JODA_READABLE_INSTANT + .getDeclaredMethods() + .filter(ElementMatchers.named("getMillis")) + .getOnly()), + LongConstant.forValue(TimeUnit.DAYS.toMillis(1)), + Division.LONG, + MethodInvocation.invoke( + JAVA_LOCALE_DATE + .getDeclaredMethods() + .filter(ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochDay"))) + .getOnly())); + return shortCircuitReturnNull(readValue, onNotNull); } return super.convertDefault(type); } diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java index 730ccf60e0b9..ab3f4ec5398c 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java @@ -28,6 +28,7 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.JavaSerializer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; @@ -62,6 +63,7 @@ import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.extensions.avro.schemas.TestAvro; +import org.apache.beam.sdk.extensions.avro.schemas.TestAvroFactory; import org.apache.beam.sdk.extensions.avro.schemas.TestAvroNested; import org.apache.beam.sdk.extensions.avro.schemas.TestEnum; import org.apache.beam.sdk.extensions.avro.schemas.fixed4; @@ -105,7 +107,7 @@ public class AvroCoderTest { new DateTime().withDate(1997, 4, 25).withZone(DateTimeZone.UTC); private static final TestAvroNested AVRO_NESTED_SPECIFIC_RECORD = new TestAvroNested(true, 42); private static final TestAvro AVRO_SPECIFIC_RECORD = - new TestAvro( + TestAvroFactory.newInstance( true, 43, 44L, @@ -120,6 +122,7 @@ public class AvroCoderTest { AVRO_NESTED_SPECIFIC_RECORD, ImmutableList.of(AVRO_NESTED_SPECIFIC_RECORD, AVRO_NESTED_SPECIFIC_RECORD), ImmutableMap.of("k1", AVRO_NESTED_SPECIFIC_RECORD, "k2", AVRO_NESTED_SPECIFIC_RECORD)); + private static final String VERSION_AVRO = Schema.class.getPackage().getImplementationVersion(); @DefaultCoder(AvroCoder.class) private static class Pojo { @@ -286,6 +289,7 @@ public void testKryoSerialization() throws Exception { // Kryo instantiation Kryo kryo = new Kryo(); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.addDefaultSerializer(AvroCoder.SerializableSchemaSupplier.class, JavaSerializer.class); // Serialization of object without any memoization ByteArrayOutputStream coderWithoutMemoizationBos = new ByteArrayOutputStream(); @@ -326,6 +330,10 @@ public void testPojoEncoding() throws Exception { @Test public void testSpecificRecordEncoding() throws Exception { + if (isBrokenMapComparison()) { + // Don't compare the map values because of AVRO-2943 + AVRO_SPECIFIC_RECORD.setMap(ImmutableMap.of()); + } AvroCoder coder = AvroCoder.of(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema(), false); @@ -333,6 +341,12 @@ public void testSpecificRecordEncoding() throws Exception { CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD); } + private boolean isBrokenMapComparison() { + return VERSION_AVRO.equals("1.9.2") + || VERSION_AVRO.equals("1.10.2") + || VERSION_AVRO.equals("1.11.1"); + } + @Test public void testReflectRecordEncoding() throws Exception { AvroCoder coder = AvroCoder.of(TestAvro.class, true); @@ -353,10 +367,8 @@ public void testDisableReflectionEncoding() { fail("When userReclectApi is disable, schema should not be generated through reflection"); } catch (AvroRuntimeException e) { String message = - "avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: " - + "org.apache.avro.AvroRuntimeException: " - + "Not a Specific class: class org.apache.beam.sdk.extensions.avro.coders.AvroCoderTest$Pojo"; - assertEquals(message, e.getMessage()); + "Not a Specific class: class org.apache.beam.sdk.extensions.avro.coders.AvroCoderTest$Pojo"; + assertTrue(e.getMessage().contains(message)); } } diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/AvroGeneratedUserFactory.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/AvroGeneratedUserFactory.java new file mode 100644 index 000000000000..066b65c5d5e6 --- /dev/null +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/AvroGeneratedUserFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.avro.io; + +import java.lang.reflect.Constructor; +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Create a {@link AvroGeneratedUser} instance with different constructors. */ +public class AvroGeneratedUserFactory { + private static final Logger LOG = LoggerFactory.getLogger(AvroGeneratedUserFactory.class); + private static final String VERSION_AVRO = Schema.class.getPackage().getImplementationVersion(); + + public static AvroGeneratedUser newInstance( + String name, Integer favoriteNumber, String favoriteColor) { + + if (VERSION_AVRO.equals("1.8.2")) { + return new AvroGeneratedUser(name, favoriteNumber, favoriteColor); + } else { + try { + Constructor constructor; + constructor = + AvroGeneratedUser.class.getDeclaredConstructor( + CharSequence.class, Integer.class, CharSequence.class); + + return (AvroGeneratedUser) constructor.newInstance(name, favoriteNumber, favoriteColor); + } catch (ReflectiveOperationException e) { + LOG.error(String.format("Fail to create a AvroGeneratedUser instance: %s", e.getMessage())); + return new AvroGeneratedUser(); // return an empty instance to fail the tests + } + } + } +} diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/AvroIOTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/AvroIOTest.java index 7456435948d3..158bf33cc875 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/AvroIOTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/AvroIOTest.java @@ -367,9 +367,9 @@ private void testWriteThenReadGeneratedClass( List values = ImmutableList.of( - (T) new AvroGeneratedUser("Bob", 256, null), - (T) new AvroGeneratedUser("Alice", 128, null), - (T) new AvroGeneratedUser("Ted", null, "white")); + (T) AvroGeneratedUserFactory.newInstance("Bob", 256, null), + (T) AvroGeneratedUserFactory.newInstance("Alice", 128, null), + (T) AvroGeneratedUserFactory.newInstance("Ted", null, "white")); writePipeline .apply(Create.of(values)) diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/AvroSchemaTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/AvroSchemaTest.java index 066739ade69f..9dfdbd499e1e 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/AvroSchemaTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/AvroSchemaTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.extensions.avro.schemas; +import static org.apache.beam.sdk.schemas.utils.SchemaTestUtils.equivalentTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import java.nio.ByteBuffer; @@ -304,7 +306,7 @@ public String toString() { private static final LocalDate DATE = new LocalDate(1979, 3, 14); private static final TestAvroNested AVRO_NESTED_SPECIFIC_RECORD = new TestAvroNested(true, 42); private static final TestAvro AVRO_SPECIFIC_RECORD = - new TestAvro( + TestAvroFactory.newInstance( true, 43, 44L, @@ -377,7 +379,9 @@ public void testSpecificRecordSchema() { @Test public void testPojoSchema() { - assertEquals(POJO_SCHEMA, new AvroRecordSchema().schemaFor(TypeDescriptor.of(AvroPojo.class))); + assertThat( + new AvroRecordSchema().schemaFor(TypeDescriptor.of(AvroPojo.class)), + equivalentTo(POJO_SCHEMA)); } @Test @@ -449,7 +453,7 @@ public void testRowToGenericRecord() { public void testPojoRecordToRow() { SerializableFunction toRow = new AvroRecordSchema().toRowFunction(TypeDescriptor.of(AvroPojo.class)); - assertEquals(ROW_FOR_POJO, toRow.apply(AVRO_POJO)); + assertThat(toRow.apply(AVRO_POJO), equivalentTo(ROW_FOR_POJO)); } @Test diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/TestAvroFactory.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/TestAvroFactory.java new file mode 100644 index 000000000000..dd549f7f2435 --- /dev/null +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/TestAvroFactory.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.avro.schemas; + +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.joda.time.DateTime; +import org.joda.time.LocalDate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Create a {@link TestAvro} instance with different constructors. */ +public class TestAvroFactory { + private static final Logger LOG = LoggerFactory.getLogger(TestAvroFactory.class); + private static final String VERSION_AVRO = Schema.class.getPackage().getImplementationVersion(); + + public static TestAvro newInstance( + Boolean boolNonNullable, + Integer integer, + Long aLong, + Float aFloat, + Double aDouble, + String string, + ByteBuffer bytes, + fixed4 fixed, + LocalDate date, + DateTime timestampMillis, + TestEnum testEnum, + TestAvroNested row, + List array, + Map map) { + + if (VERSION_AVRO.equals("1.8.2")) { + return new TestAvro( + boolNonNullable, + integer, + aLong, + aFloat, + aDouble, + string, + bytes, + fixed, + date, + timestampMillis, + testEnum, + row, + array, + map); + } else { + try { + Constructor constructor; + constructor = + TestAvro.class.getDeclaredConstructor( + Boolean.class, + Integer.class, + Long.class, + Float.class, + Double.class, + CharSequence.class, + ByteBuffer.class, + fixed4.class, + java.time.LocalDate.class, + java.time.Instant.class, + TestEnum.class, + TestAvroNested.class, + java.util.List.class, + java.util.Map.class); + + return (TestAvro) + constructor.newInstance( + boolNonNullable, + integer, + aLong, + aFloat, + aDouble, + string, + bytes, + fixed, + java.time.LocalDate.of(date.getYear(), date.getMonthOfYear(), date.getDayOfMonth()), + java.time.Instant.ofEpochMilli(timestampMillis.getMillis()), + testEnum, + row, + array, + map); + } catch (ReflectiveOperationException e) { + LOG.error(String.format("Fail to create a TestAvro instance: %s", e.getMessage())); + return new TestAvro(); // return an empty instance to fail the tests + } + } + } +} diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index 4e282fb7094b..fa3d3343ac6e 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser; +import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUserFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -156,7 +157,7 @@ public void testNullableArrayFieldToBeamArrayField() { ReflectData.makeNullable( org.apache.avro.Schema.createArray(org.apache.avro.Schema.create(Type.INT))), "", - null); + (Object) null); Field expectedBeamField = Field.nullable("arrayField", FieldType.array(FieldType.INT32)); @@ -174,7 +175,7 @@ public void testNullableBeamArrayFieldToAvroField() { ReflectData.makeNullable( org.apache.avro.Schema.createArray(org.apache.avro.Schema.create(Type.INT))), "", - null); + (Object) null); org.apache.avro.Schema.Field avroField = AvroUtils.toAvroField(beamField, "ignored"); assertEquals(expectedAvroField, avroField); @@ -184,9 +185,10 @@ private static List getAvroSubSchemaFields() { List fields = Lists.newArrayList(); fields.add( new org.apache.avro.Schema.Field( - "bool", org.apache.avro.Schema.create(Type.BOOLEAN), "", null)); + "bool", org.apache.avro.Schema.create(Type.BOOLEAN), "", (Object) null)); fields.add( - new org.apache.avro.Schema.Field("int", org.apache.avro.Schema.create(Type.INT), "", null)); + new org.apache.avro.Schema.Field( + "int", org.apache.avro.Schema.create(Type.INT), "", (Object) null)); return fields; } @@ -402,21 +404,24 @@ public void testNullableFieldInAvroSchema() { List fields = Lists.newArrayList(); fields.add( new org.apache.avro.Schema.Field( - "int", ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT)), "", null)); + "int", + ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT)), + "", + (Object) null)); fields.add( new org.apache.avro.Schema.Field( "array", org.apache.avro.Schema.createArray( ReflectData.makeNullable(org.apache.avro.Schema.create(Type.BYTES))), "", - null)); + (Object) null)); fields.add( new org.apache.avro.Schema.Field( "map", org.apache.avro.Schema.createMap( ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT))), "", - null)); + (Object) null)); fields.add( new org.apache.avro.Schema.Field( "enum", @@ -424,7 +429,7 @@ public void testNullableFieldInAvroSchema() { org.apache.avro.Schema.createEnum( "fruit", "", "", ImmutableList.of("banana", "apple", "pear"))), "", - null)); + (Object) null)); org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("topLevelRecord", null, null, false, fields); @@ -472,21 +477,24 @@ public void testNullableFieldsInBeamSchema() { List fields = Lists.newArrayList(); fields.add( new org.apache.avro.Schema.Field( - "int", ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT)), "", null)); + "int", + ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT)), + "", + (Object) null)); fields.add( new org.apache.avro.Schema.Field( "array", org.apache.avro.Schema.createArray( ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT))), "", - null)); + (Object) null)); fields.add( new org.apache.avro.Schema.Field( "map", org.apache.avro.Schema.createMap( ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT))), "", - null)); + (Object) null)); org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("topLevelRecord", null, null, false, fields); assertEquals(avroSchema, AvroUtils.toAvroSchema(beamSchema)); @@ -522,7 +530,7 @@ public void testUnionFieldInAvroSchema() { fields.add( new org.apache.avro.Schema.Field( - "union", org.apache.avro.Schema.createUnion(unionFields), "", null)); + "union", org.apache.avro.Schema.createUnion(unionFields), "", (Object) null)); org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("topLevelRecord", null, null, false, fields); OneOfType oneOfType = @@ -549,7 +557,7 @@ public void testUnionFieldInBeamSchema() { unionFields.add(org.apache.avro.Schema.create(Type.STRING)); fields.add( new org.apache.avro.Schema.Field( - "union", org.apache.avro.Schema.createUnion(unionFields), "", null)); + "union", org.apache.avro.Schema.createUnion(unionFields), "", (Object) null)); org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("topLevelRecord", null, null, false, fields); GenericRecord expectedGenericRecord = @@ -796,7 +804,7 @@ public void testAvroSchemaCoders() { assertTrue(records.hasSchema()); CoderProperties.coderSerializable(records.getCoder()); - AvroGeneratedUser user = new AvroGeneratedUser("foo", 42, "green"); + AvroGeneratedUser user = AvroGeneratedUserFactory.newInstance("foo", 42, "green"); PCollection users = pipeline.apply(Create.of(user).withCoder(AvroCoder.of(AvroGeneratedUser.class))); assertFalse(users.hasSchema());