Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions .test-infra/jenkins/job_PostCommit_Java_Avro_Versions.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import CommonJobProperties as commonJobProperties
import PostcommitJobBuilder


// This job runs the Java tests that depends on Avro against different Avro API versions
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_Avro_Versions', 'Run PostCommit_Java_Avro_Versions',
'Java Avro Versions Post Commit Tests', this) {

description('Java Avro Versions Post Commit Tests')

// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 240)

// Publish all test results to Jenkins
publishers {
archiveJunit('**/build/test-results/**/*.xml')
}

// Gradle goals for this job.
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(":javaAvroVersionsTest")
commonJobProperties.setGradleSwitches(delegate)
// Specify maven home on Jenkins, needed by Maven archetype integration tests.
switches('-Pmaven_home=/home/jenkins/tools/maven/apache-maven-3.5.4')
}
}
}
1 change: 1 addition & 0 deletions .test-infra/jenkins/job_PreCommit_Java.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import PrecommitJobBuilder

// exclude paths with their own PreCommit tasks
def excludePaths = [
'extensions/avro',
'extensions/sql',
'io/amazon-web-services',
'io/amazon-web-services2',
Expand Down
4 changes: 4 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ tasks.register("javaHadoopVersionsTest") {
dependsOn(":runners:spark:3:hadoopVersionsTest")
}

tasks.register("javaAvroVersionsTest") {
dependsOn(":sdks:java:extensions:avro:avroVersionsTest")
}

tasks.register("sqlPostCommit") {
dependsOn(":sdks:java:extensions:sql:postCommit")
dependsOn(":sdks:java:extensions:sql:jdbc:postCommit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ public T convert(TypeDescriptor typeDescriptor) {
}
}

protected StackManipulation shortCircuitReturnNull(
StackManipulation readValue, StackManipulation onNotNull) {
return new ShortCircuitReturnNull(readValue, onNotNull);
}

protected abstract T convertArray(TypeDescriptor<?> type);

protected abstract T convertIterable(TypeDescriptor<?> type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ private static boolean fieldsEquivalent(Object expected, Object actual, FieldTyp
(Map<Object, Object>) expected,
(Map<Object, Object>) actual,
fieldType.getMapValueType());
} else if (fieldType.getTypeName() == TypeName.ROW) {
return rowsEquivalent((Row) expected, (Row) actual);
} else {
return Objects.equals(expected, actual);
}
Expand Down
91 changes: 88 additions & 3 deletions sdks/java/extensions/avro/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import java.util.stream.Collectors

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -27,6 +29,14 @@ applyAvroNature()

description = "Apache Beam :: SDKs :: Java :: Extensions :: Avro"

def avroVersions = [
'192' : "1.9.2",
'1102': "1.10.2",
'1111': "1.11.1",
]

avroVersions.each { k, v -> configurations.create("avroVersion$k") }

// Exclude tests that need a runner
test {
systemProperty "beamUseDummyRunner", "true"
Expand All @@ -38,19 +48,94 @@ test {
dependencies {
implementation library.java.byte_buddy
implementation library.java.vendored_guava_26_0_jre
implementation (project(path: ":sdks:java:core", configuration: "shadow")) {
implementation(project(path: ":sdks:java:core", configuration: "shadow")) {
// Exclude Avro dependencies from "core" since Avro support moved to this extension
exclude group: "org.apache.avro", module: "avro"
}
implementation library.java.error_prone_annotations
implementation library.java.avro
implementation library.java.joda_time
testImplementation (project(path: ":sdks:java:core", configuration: "shadowTest")) {
testImplementation(project(path: ":sdks:java:core", configuration: "shadowTest")) {
// Exclude Avro dependencies from "core" since Avro support moved to this extension
exclude group: "org.apache.avro", module: "avro"
}
testImplementation library.java.avro_tests
testImplementation library.java.junit
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly library.java.slf4j_jdk14
}
avroVersions.each {
"avroVersion$it.key" "org.apache.avro:avro:$it.value"
"avroVersion$it.key" "org.apache.avro:avro-tools:$it.value"
}
}

avroVersions.each { kv ->
configurations."avroVersion$kv.key" {
resolutionStrategy {
force "org.apache.avro:avro:$kv.value"
}
}

sourceSets {
"avro${kv.key}" {
java {
srcDirs "build/generated/sources/avro${kv.key}/test/java"
}

compileClasspath = configurations."avroVersion$kv.key" + sourceSets.test.output + sourceSets.test.compileClasspath
runtimeClasspath += compileClasspath + sourceSets.test.runtimeClasspath
}
}

"compileAvro${kv.key}Java" {
checkerFramework {
skipCheckerFramework = true
}
}

"spotbugsAvro${kv.key}" {
ignoreFailures = true
}

"generateAvro${kv.key}AvroJava" {
dependsOn "generateAvroClasses${kv.key}"
}

task "avroVersion${kv.key}Test"(type: Test) {
group = "Verification"
description = "Runs Avro extension tests with Avro version $kv.value"
outputs.upToDateWhen { false }
classpath = sourceSets."avro${kv.key}".runtimeClasspath

include '**/*.class'
exclude '**/AvroIOTest$NeedsRunnerTests$*.class'

dependsOn "generateAvroClasses${kv.key}"
}

task "generateAvroClasses${kv.key}"(type: JavaExec) {
group = "build"
description = "Generate Avro classes for Avro version $kv.value"
classpath = configurations."avroVersion$kv.key"
main = "org.apache.avro.tool.Main"
args = [
"compile",
"schema",
"src/test/avro/org/apache/beam/sdk/extensions/avro/io/user.avsc",
"src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/test.avsc",
"build/generated/sources/avro${kv.key}/test/java"
]
}
}

task avroVersionsTest {
group = "Verification"
description = 'Runs Avro extension tests with different Avro API versions'
dependsOn createTaskNames(avroVersions, "Test")
}

static def createTaskNames(Map<String, String> prefixMap, String suffix) {
return prefixMap.keySet().stream()
.map { version -> "avroVersion${version}${suffix}" }
.collect(Collectors.toList())
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private Object readResolve() throws IOException, ClassNotFoundException {
* Serializable}'s usage of the {@link #writeReplace} method. Kryo doesn't utilize Java's
* serialization and hence is able to encode the {@link Schema} object directly.
*/
private static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> {
static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> {
// writeReplace makes this object serializable. This is a limitation of FindBugs as discussed
// here:
// http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,18 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
import net.bytebuddy.implementation.bytecode.Division;
import net.bytebuddy.implementation.bytecode.Duplication;
import net.bytebuddy.implementation.bytecode.Multiplication;
import net.bytebuddy.implementation.bytecode.StackManipulation;
import net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
import net.bytebuddy.implementation.bytecode.TypeCreation;
import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
import net.bytebuddy.implementation.bytecode.constant.LongConstant;
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
import net.bytebuddy.matcher.ElementMatchers;
import org.apache.avro.AvroRuntimeException;
Expand Down Expand Up @@ -144,13 +148,22 @@
"rawtypes"
})
public class AvroUtils {

static {
// This works around a bug in the Avro library (AVRO-1891) around SpecificRecord's handling
// of DateTime types.
SpecificData.get().addLogicalTypeConversion(new AvroCoder.JodaTimestampConversion());
GenericData.get().addLogicalTypeConversion(new AvroCoder.JodaTimestampConversion());
}

private static final ForLoadedType BYTES = new ForLoadedType(byte[].class);
private static final ForLoadedType JAVA_INSTANT = new ForLoadedType(java.time.Instant.class);
private static final ForLoadedType JAVA_LOCALE_DATE =
new ForLoadedType(java.time.LocalDate.class);
private static final ForLoadedType JODA_READABLE_INSTANT =
new ForLoadedType(ReadableInstant.class);
private static final ForLoadedType JODA_INSTANT = new ForLoadedType(Instant.class);

// Unwrap an AVRO schema into the base type an whether it is nullable.
public static class TypeWithNullability {
public final org.apache.avro.Schema type;
Expand Down Expand Up @@ -254,7 +267,10 @@ public AvroConvertType(boolean returnRawType) {

@Override
protected java.lang.reflect.Type convertDefault(TypeDescriptor<?> type) {
if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
if (type.isSubtypeOf(TypeDescriptor.of(java.time.Instant.class))
|| type.isSubtypeOf(TypeDescriptor.of(java.time.LocalDate.class))) {
return convertDateTime(type);
} else if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
return byte[].class;
} else {
return super.convertDefault(type);
Expand Down Expand Up @@ -282,10 +298,46 @@ protected StackManipulation convertDefault(TypeDescriptor<?> type) {
MethodInvocation.invoke(
new ForLoadedType(GenericFixed.class)
.getDeclaredMethods()
.filter(
ElementMatchers.named("bytes")
.and(ElementMatchers.returns(new ForLoadedType(byte[].class))))
.filter(ElementMatchers.named("bytes").and(ElementMatchers.returns(BYTES)))
.getOnly()));
} else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) {
// Generates the following code:
// return Instant.ofEpochMilli(value.toEpochMilli())
StackManipulation onNotNull =
new Compound(
readValue,
MethodInvocation.invoke(
JAVA_INSTANT
.getDeclaredMethods()
.filter(ElementMatchers.named("toEpochMilli"))
.getOnly()),
MethodInvocation.invoke(
JODA_INSTANT
.getDeclaredMethods()
.filter(
ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochMilli")))
.getOnly()));
return shortCircuitReturnNull(readValue, onNotNull);
} else if (java.time.LocalDate.class.isAssignableFrom(type.getRawType())) {
// Generates the following code:
// return Instant.ofEpochMilli(value.toEpochDay() * TimeUnit.DAYS.toMillis(1))
StackManipulation onNotNull =
new Compound(
readValue,
MethodInvocation.invoke(
JAVA_LOCALE_DATE
.getDeclaredMethods()
.filter(ElementMatchers.named("toEpochDay"))
.getOnly()),
LongConstant.forValue(TimeUnit.DAYS.toMillis(1)),
Multiplication.LONG,
MethodInvocation.invoke(
JODA_INSTANT
.getDeclaredMethods()
.filter(
ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochMilli")))
.getOnly()));
return shortCircuitReturnNull(readValue, onNotNull);
}
return super.convertDefault(type);
}
Expand All @@ -303,25 +355,60 @@ protected TypeConversionsFactory getFactory() {

@Override
protected StackManipulation convertDefault(TypeDescriptor<?> type) {
final ForLoadedType byteArrayType = new ForLoadedType(byte[].class);
if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
// Generate the following code:
// return new T((byte[]) value);
// return new T((byte[]) value);
ForLoadedType loadedType = new ForLoadedType(type.getRawType());
return new Compound(
TypeCreation.of(loadedType),
Duplication.SINGLE,
// Load the parameter and cast it to a byte[].
readValue,
TypeCasting.to(byteArrayType),
TypeCasting.to(BYTES),
// Create a new instance that wraps this byte[].
MethodInvocation.invoke(
loadedType
.getDeclaredMethods()
.filter(
ElementMatchers.isConstructor()
.and(ElementMatchers.takesArguments(byteArrayType)))
ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(BYTES)))
.getOnly()));
} else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) {
// Generates the following code:
// return java.time.Instant.ofEpochMilli(value.getMillis())
StackManipulation onNotNull =
new Compound(
readValue,
MethodInvocation.invoke(
JODA_READABLE_INSTANT
.getDeclaredMethods()
.filter(ElementMatchers.named("getMillis"))
.getOnly()),
MethodInvocation.invoke(
JAVA_INSTANT
.getDeclaredMethods()
.filter(
ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochMilli")))
.getOnly()));
return shortCircuitReturnNull(readValue, onNotNull);
} else if (java.time.LocalDate.class.isAssignableFrom(type.getRawType())) {
// Generates the following code:
// return java.time.LocalDate.ofEpochDay(value.getMillis() / TimeUnit.DAYS.toMillis(1))
StackManipulation onNotNull =
new Compound(
readValue,
MethodInvocation.invoke(
JODA_READABLE_INSTANT
.getDeclaredMethods()
.filter(ElementMatchers.named("getMillis"))
.getOnly()),
LongConstant.forValue(TimeUnit.DAYS.toMillis(1)),
Division.LONG,
MethodInvocation.invoke(
JAVA_LOCALE_DATE
.getDeclaredMethods()
.filter(ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochDay")))
.getOnly()));
return shortCircuitReturnNull(readValue, onNotNull);
}
return super.convertDefault(type);
}
Expand Down
Loading