From 71b7d8bb2f0f117b33072ef3aa3201980185b6e2 Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Mon, 28 Dec 2020 09:59:01 -0800 Subject: [PATCH 1/4] [BEAM-10925] Load SQL UDFs from jar. --- sdks/java/extensions/sql/zetasql/build.gradle | 14 ++ .../zetasql/translation/JavaUdfLoader.java | 187 ++++++++++++++++++ .../translation/UserFunctionDefinitions.java | 9 +- .../translation/JavaUdfLoaderTest.java | 109 ++++++++++ .../zetasql/udf-test-provider/build.gradle | 34 ++++ .../sql/zetasql/provider/UdfTestProvider.java | 40 ++++ .../sql/zetasql/provider/package-info.java | 20 ++ settings.gradle | 1 + 8 files changed, 413 insertions(+), 1 deletion(-) create mode 100644 sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoader.java create mode 100644 sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoaderTest.java create mode 100644 sdks/java/extensions/sql/zetasql/udf-test-provider/build.gradle create mode 100644 sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/UdfTestProvider.java create mode 100644 sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/package-info.java diff --git a/sdks/java/extensions/sql/zetasql/build.gradle b/sdks/java/extensions/sql/zetasql/build.gradle index 9589301e90ca..137a92262ea7 100644 --- a/sdks/java/extensions/sql/zetasql/build.gradle +++ b/sdks/java/extensions/sql/zetasql/build.gradle @@ -33,6 +33,7 @@ dependencies { compile enforcedPlatform(library.java.google_cloud_platform_libraries_bom) compile project(":sdks:java:core") compile project(":sdks:java:extensions:sql") + compile project(":sdks:java:extensions:sql:udf") compile library.java.vendored_calcite_1_20_0 compile library.java.guava compile library.java.grpc_api @@ -51,5 +52,18 @@ dependencies { testCompile library.java.mockito_core testCompile library.java.quickcheck_core testRuntimeClasspath library.java.slf4j_jdk14 + testCompileOnly project(":sdks:java:extensions:sql:zetasql:udf-test-provider") } +task emptyJar(type: Jar) { + archiveBaseName = "${project.archivesBaseName}-empty-jar" + from fileTree(dir: getTemporaryDir().createNewFile().toString()) +} + +test { + dependsOn emptyJar + // Pass jars used by Java UDF tests via system properties. + evaluationDependsOn(":sdks:java:extensions:sql:zetasql:udf-test-provider") // Needed to resolve jarPath. + systemProperty "beam.sql.udf.test.jar_path", project(":sdks:java:extensions:sql:zetasql:udf-test-provider").jarPath + systemProperty "beam.sql.udf.test.empty_jar_path", emptyJar.archivePath +} diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoader.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoader.java new file mode 100644 index 000000000000..78d4d1fa7212 --- /dev/null +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoader.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.zetasql.translation; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.ProviderNotFoundException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.beam.sdk.extensions.sql.udf.ScalarFn; +import org.apache.beam.sdk.extensions.sql.udf.UdfProvider; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.commons.codec.digest.DigestUtils; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Loads {@link UdfProvider} implementations from user-provided jars. + * + *

All UDFs are loaded and cached for each jar to mitigate IO costs. + */ +public class JavaUdfLoader { + private static final Logger LOG = LoggerFactory.getLogger(JavaUdfLoader.class); + + /** + * Maps the external jar location to the functions the jar defines. Static so it can persist + * across multiple SQL transforms. + */ + private static final Map cache = new HashMap<>(); + + private static final ClassLoader originalClassLoader = ReflectHelpers.findClassLoader(); + + /** + * Load a user-defined scalar function from the specified jar. + * + *

WARNING: The first time a jar is loaded, it is added to the thread's + * context {@link ClassLoader} so that the jar can be staged by the runner. + */ + public ScalarFn loadScalarFunction(List functionPath, String jarPath) { + String functionFullName = String.join(".", functionPath); + try { + UserFunctionDefinitions functionDefinitions = loadJar(jarPath); + if (!functionDefinitions.javaScalarFunctions().containsKey(functionPath)) { + throw new IllegalArgumentException( + String.format( + "No implementation of scalar function %s found in %s.%n" + + " 1. Create a class implementing %s and annotate it with @AutoService(%s.class).%n" + + " 2. Add function %s to the class's userDefinedScalarFunctions implementation.", + functionFullName, + jarPath, + UdfProvider.class.getSimpleName(), + UdfProvider.class.getSimpleName(), + functionFullName)); + } + return functionDefinitions.javaScalarFunctions().get(functionPath); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Failed to load user-defined scalar function %s from %s", functionFullName, jarPath), + e); + } + } + + /** + * Creates a temporary local copy of the file at {@code inputPath}, and returns a handle to the + * local copy. + */ + private File downloadFile(String inputPath, String mimeType) throws IOException { + Preconditions.checkArgument(!inputPath.isEmpty(), "Path cannot be empty."); + ResourceId inputResource = FileSystems.matchNewResource(inputPath, false /* is directory */); + try (ReadableByteChannel inputChannel = FileSystems.open(inputResource)) { + File outputFile = File.createTempFile("sql-udf-", inputResource.getFilename()); + ResourceId outputResource = + FileSystems.matchNewResource(outputFile.getAbsolutePath(), false /* is directory */); + try (WritableByteChannel outputChannel = FileSystems.create(outputResource, mimeType)) { + ByteStreams.copy(inputChannel, outputChannel); + } + // Compute and log checksum. + try (InputStream inputStream = new FileInputStream(outputFile)) { + LOG.info( + "Copied {} to {} with md5 hash {}.", + inputPath, + outputFile.getAbsolutePath(), + DigestUtils.md5Hex(inputStream)); + } + return outputFile; + } + } + + private ClassLoader createAndSetClassLoader(String inputJarPath) throws IOException { + File tmpJar = downloadFile(inputJarPath, "application/java-archive"); + // Set the thread's context class loader so that the jar can be staged by the runner. + Thread.currentThread() + .setContextClassLoader( + new URLClassLoader( + new URL[] {tmpJar.toURI().toURL()}, ReflectHelpers.findClassLoader())); + // Return a class loader that isolates the target jar from other UDF jars that might have been + // loaded previously. + return new URLClassLoader(new URL[] {tmpJar.toURI().toURL()}, originalClassLoader); + } + + @VisibleForTesting + Iterator getUdfProviders(ClassLoader classLoader) throws IOException { + return ServiceLoader.load(UdfProvider.class, classLoader).iterator(); + } + + private UserFunctionDefinitions loadJar(String jarPath) throws IOException { + if (cache.containsKey(jarPath)) { + LOG.debug("Using cached function definitions from {}", jarPath); + return cache.get(jarPath); + } + + Map, ScalarFn> scalarFunctions = new HashMap<>(); + ClassLoader classLoader = createAndSetClassLoader(jarPath); + Iterator providers = getUdfProviders(classLoader); + int providersCount = 0; + while (providers.hasNext()) { + providersCount++; + UdfProvider provider = providers.next(); + provider + .userDefinedScalarFunctions() + .forEach( + (functionName, implementation) -> { + List functionPath = ImmutableList.copyOf(functionName.split("\\.")); + if (scalarFunctions.containsKey(functionPath)) { + throw new IllegalArgumentException( + String.format( + "Found multiple definitions of scalar function %s in %s.", + functionName, jarPath)); + } + scalarFunctions.put(functionPath, implementation); + }); + } + if (providersCount == 0) { + throw new ProviderNotFoundException( + String.format( + "No %s implementation found in %s. Create a class implementing %s and annotate it with @AutoService(%s.class).", + UdfProvider.class.getSimpleName(), + jarPath, + UdfProvider.class.getSimpleName(), + UdfProvider.class.getSimpleName())); + } + LOG.info( + "Loaded {} implementations of {} from {} with {} scalar function(s).", + providersCount, + UdfProvider.class.getSimpleName(), + jarPath, + scalarFunctions.size()); + UserFunctionDefinitions userFunctionDefinitions = + UserFunctionDefinitions.newBuilder() + .setJavaScalarFunctions(ImmutableMap.copyOf(scalarFunctions)) + .build(); + cache.put(jarPath, userFunctionDefinitions); + return userFunctionDefinitions; + } +} diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/UserFunctionDefinitions.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/UserFunctionDefinitions.java index 395ad5e7a639..32d07237b42f 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/UserFunctionDefinitions.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/UserFunctionDefinitions.java @@ -21,6 +21,7 @@ import com.google.zetasql.resolvedast.ResolvedNode; import com.google.zetasql.resolvedast.ResolvedNodes; import java.util.List; +import org.apache.beam.sdk.extensions.sql.udf.ScalarFn; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** Holds user defined function definitions. */ @@ -36,6 +37,8 @@ public abstract class UserFunctionDefinitions { */ public abstract ImmutableMap, ResolvedNode> sqlTableValuedFunctions(); + public abstract ImmutableMap, ScalarFn> javaScalarFunctions(); + @AutoValue.Builder public abstract static class Builder { public abstract Builder setSqlScalarFunctions( @@ -44,12 +47,16 @@ public abstract Builder setSqlScalarFunctions( public abstract Builder setSqlTableValuedFunctions( ImmutableMap, ResolvedNode> sqlTableValuedFunctions); + public abstract Builder setJavaScalarFunctions( + ImmutableMap, ScalarFn> javaScalarFunctions); + public abstract UserFunctionDefinitions build(); } public static Builder newBuilder() { return new AutoValue_UserFunctionDefinitions.Builder() .setSqlScalarFunctions(ImmutableMap.of()) - .setSqlTableValuedFunctions(ImmutableMap.of()); + .setSqlTableValuedFunctions(ImmutableMap.of()) + .setJavaScalarFunctions(ImmutableMap.of()); } } diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoaderTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoaderTest.java new file mode 100644 index 000000000000..b68a872f45b3 --- /dev/null +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoaderTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.zetasql.translation; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.file.ProviderNotFoundException; +import java.util.Collections; +import java.util.Iterator; +import org.apache.beam.sdk.extensions.sql.udf.UdfProvider; +import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link JavaUdfLoader}. */ +@RunWith(JUnit4.class) +public class JavaUdfLoaderTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private final String jarPathProperty = "beam.sql.udf.test.jar_path"; + private final String emptyJarPathProperty = "beam.sql.udf.test.empty_jar_path"; + + private final @Nullable String jarPath = System.getProperty(jarPathProperty); + private final @Nullable String emptyJarPath = System.getProperty(emptyJarPathProperty); + + private final ClassLoader originalClassLoader = ReflectHelpers.findClassLoader(); + + @Before + public void setUp() { + if (jarPath == null) { + fail( + String.format( + "System property %s must be set to run %s.", + jarPathProperty, JavaUdfLoaderTest.class.getSimpleName())); + } + if (emptyJarPath == null) { + fail( + String.format( + "System property %s must be set to run %s.", + emptyJarPathProperty, JavaUdfLoaderTest.class.getSimpleName())); + } + } + + /** + * Test that the parent classloader does not load any implementations of {@link UdfProvider}. This + * is important because we do not want to pollute the user's namespace. + */ + @Test + public void testClassLoaderHasNoUdfProviders() throws IOException { + JavaUdfLoader udfLoader = new JavaUdfLoader(); + Iterator udfProviders = udfLoader.getUdfProviders(originalClassLoader); + assertFalse(udfProviders.hasNext()); + } + + @Test + @SuppressWarnings( + "nullness") // We check if jarPath is null in setUp, but the checker framework doesn't know. + public void testLoadScalarFunction() { + JavaUdfLoader udfLoader = new JavaUdfLoader(); + udfLoader.loadScalarFunction(Collections.singletonList("helloWorld"), jarPath); + } + + @Test + @SuppressWarnings( + "nullness") // We check if jarPath is null in setUp, but the checker framework doesn't know. + public void testLoadUnregisteredScalarFunctionThrowsRuntimeException() { + JavaUdfLoader udfLoader = new JavaUdfLoader(); + thrown.expect(RuntimeException.class); + thrown.expectMessage( + String.format("No implementation of scalar function notRegistered found in %s.", jarPath)); + udfLoader.loadScalarFunction(Collections.singletonList("notRegistered"), jarPath); + } + + @Test + @SuppressWarnings( + "nullness") // We check if jarPath and emptyJarPath are null in setUp, but the checker + // framework doesn't know. + public void testJarMissingUdfProviderThrowsProviderNotFoundException() { + JavaUdfLoader udfLoader = new JavaUdfLoader(); + thrown.expect(ProviderNotFoundException.class); + thrown.expectMessage(String.format("No UdfProvider implementation found in %s.", emptyJarPath)); + // Load from an inhabited jar first so we can make sure we load UdfProviders in isolation + // from other jars. + udfLoader.loadScalarFunction(Collections.singletonList("helloWorld"), jarPath); + udfLoader.loadScalarFunction(Collections.singletonList("helloWorld"), emptyJarPath); + } +} diff --git a/sdks/java/extensions/sql/zetasql/udf-test-provider/build.gradle b/sdks/java/extensions/sql/zetasql/udf-test-provider/build.gradle new file mode 100644 index 000000000000..b1a2c5877e4e --- /dev/null +++ b/sdks/java/extensions/sql/zetasql/udf-test-provider/build.gradle @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id 'org.apache.beam.module' +} + +applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.sql.zetasql.provider') + +description = "Apache Beam :: SDKs :: Java :: Extensions :: SQL :: ZetaSQL :: UDF test provider" +ext.summary = "Java UDFs for testing. This project must be built separately from its parent so the UDF provider is not included in the context classloader for tests." + +project.ext.jarPath = jar.archivePath + +dependencies { + // No dependency (direct or transitive) on :sdks:java:core. + compile project(":sdks:java:extensions:sql:udf") + compile library.java.guava +} diff --git a/sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/UdfTestProvider.java b/sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/UdfTestProvider.java new file mode 100644 index 000000000000..96c1a7670754 --- /dev/null +++ b/sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/UdfTestProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.zetasql.provider; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.udf.ScalarFn; +import org.apache.beam.sdk.extensions.sql.udf.UdfProvider; + +/** Defines Java UDFs for use in tests. */ +@AutoService(UdfProvider.class) +public class UdfTestProvider implements UdfProvider { + @Override + public Map userDefinedScalarFunctions() { + return ImmutableMap.of("helloWorld", new HelloWorldFn()); + } + + public static class HelloWorldFn extends ScalarFn { + @ApplyMethod + public String helloWorld() { + return "Hello world!"; + } + } +} diff --git a/sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/package-info.java b/sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/package-info.java new file mode 100644 index 000000000000..8e222e327a44 --- /dev/null +++ b/sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Package containing UDF providers for testing. */ +package org.apache.beam.sdk.extensions.sql.zetasql.provider; diff --git a/settings.gradle b/settings.gradle index 8ccf0bc54676..d5c9212ec1b4 100644 --- a/settings.gradle +++ b/settings.gradle @@ -124,6 +124,7 @@ include ":sdks:java:extensions:sql:shell" include ":sdks:java:extensions:sql:hcatalog" include ":sdks:java:extensions:sql:datacatalog" include ":sdks:java:extensions:sql:zetasql" +include ":sdks:java:extensions:sql:zetasql:udf-test-provider" include ":sdks:java:extensions:sql:expansion-service" include ":sdks:java:extensions:sql:udf" include ":sdks:java:extensions:zetasketch" From 25b890379a492e01b4e48e90bf716fb51eb846bb Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Mon, 28 Dec 2020 18:52:17 -0800 Subject: [PATCH 2/4] Move JavaUdfLoader from zetasql -> sql. --- sdks/java/extensions/sql/build.gradle | 14 +++++++ .../sql/impl/JavaUdfDefinitions.java | 40 +++++++++++++++++++ .../extensions/sql/impl}/JavaUdfLoader.java | 18 ++++----- .../sql/impl}/JavaUdfLoaderTest.java | 2 +- .../udf-test-provider/build.gradle | 4 +- .../sql}/provider/UdfTestProvider.java | 2 +- .../sql}/provider/package-info.java | 2 +- sdks/java/extensions/sql/zetasql/build.gradle | 14 ------- .../translation/UserFunctionDefinitions.java | 9 +---- settings.gradle | 2 +- 10 files changed, 70 insertions(+), 37 deletions(-) create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfDefinitions.java rename sdks/java/extensions/sql/{zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation => src/main/java/org/apache/beam/sdk/extensions/sql/impl}/JavaUdfLoader.java (92%) rename sdks/java/extensions/sql/{zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/translation => src/test/java/org/apache/beam/sdk/extensions/sql/impl}/JavaUdfLoaderTest.java (98%) rename sdks/java/extensions/sql/{zetasql => }/udf-test-provider/build.gradle (95%) rename sdks/java/extensions/sql/{zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql => udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql}/provider/UdfTestProvider.java (95%) rename sdks/java/extensions/sql/{zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql => udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql}/provider/package-info.java (93%) diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle index aa52d080fdf6..3b12082034ae 100644 --- a/sdks/java/extensions/sql/build.gradle +++ b/sdks/java/extensions/sql/build.gradle @@ -90,6 +90,7 @@ dependencies { testCompile library.java.testcontainers_kafka testCompile library.java.google_cloud_bigtable_emulator testCompile project(path: ":sdks:java:io:mongodb", configuration: "testRuntime") + testCompileOnly project(":sdks:java:extensions:sql:udf-test-provider") testRuntimeClasspath library.java.slf4j_jdk14 hadoopVersions.each {kv -> "hadoopVersion$kv.key" "org.apache.hadoop:hadoop-client:$kv.value" @@ -224,3 +225,16 @@ hadoopVersions.each { kv -> include '**/*Test.class' } } + +task emptyJar(type: Jar) { + archiveBaseName = "${project.archivesBaseName}-empty-jar" + from fileTree(dir: getTemporaryDir().createNewFile().toString()) +} + +test { + dependsOn emptyJar + // Pass jars used by Java UDF tests via system properties. + evaluationDependsOn(":sdks:java:extensions:sql:udf-test-provider") // Needed to resolve jarPath. + systemProperty "beam.sql.udf.test.jar_path", project(":sdks:java:extensions:sql:udf-test-provider").jarPath + systemProperty "beam.sql.udf.test.empty_jar_path", emptyJar.archivePath +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfDefinitions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfDefinitions.java new file mode 100644 index 000000000000..0002babee374 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfDefinitions.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import com.google.auto.value.AutoValue; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.udf.ScalarFn; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** Holds user defined function definitions. */ +@AutoValue +public abstract class JavaUdfDefinitions { + public abstract ImmutableMap, ScalarFn> scalarFunctions(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setScalarFunctions(ImmutableMap, ScalarFn> value); + + public abstract JavaUdfDefinitions build(); + } + + public static Builder newBuilder() { + return new AutoValue_JavaUdfDefinitions.Builder().setScalarFunctions(ImmutableMap.of()); + } +} diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java similarity index 92% rename from sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoader.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java index 78d4d1fa7212..254c85d7203a 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoader.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.zetasql.translation; +package org.apache.beam.sdk.extensions.sql.impl; import java.io.File; import java.io.FileInputStream; @@ -57,7 +57,7 @@ public class JavaUdfLoader { * Maps the external jar location to the functions the jar defines. Static so it can persist * across multiple SQL transforms. */ - private static final Map cache = new HashMap<>(); + private static final Map cache = new HashMap<>(); private static final ClassLoader originalClassLoader = ReflectHelpers.findClassLoader(); @@ -70,8 +70,8 @@ public class JavaUdfLoader { public ScalarFn loadScalarFunction(List functionPath, String jarPath) { String functionFullName = String.join(".", functionPath); try { - UserFunctionDefinitions functionDefinitions = loadJar(jarPath); - if (!functionDefinitions.javaScalarFunctions().containsKey(functionPath)) { + JavaUdfDefinitions functionDefinitions = loadJar(jarPath); + if (!functionDefinitions.scalarFunctions().containsKey(functionPath)) { throw new IllegalArgumentException( String.format( "No implementation of scalar function %s found in %s.%n" @@ -83,7 +83,7 @@ public ScalarFn loadScalarFunction(List functionPath, String jarPath) { UdfProvider.class.getSimpleName(), functionFullName)); } - return functionDefinitions.javaScalarFunctions().get(functionPath); + return functionDefinitions.scalarFunctions().get(functionPath); } catch (IOException e) { throw new RuntimeException( String.format( @@ -135,7 +135,7 @@ Iterator getUdfProviders(ClassLoader classLoader) throws IOExceptio return ServiceLoader.load(UdfProvider.class, classLoader).iterator(); } - private UserFunctionDefinitions loadJar(String jarPath) throws IOException { + private JavaUdfDefinitions loadJar(String jarPath) throws IOException { if (cache.containsKey(jarPath)) { LOG.debug("Using cached function definitions from {}", jarPath); return cache.get(jarPath); @@ -177,9 +177,9 @@ private UserFunctionDefinitions loadJar(String jarPath) throws IOException { UdfProvider.class.getSimpleName(), jarPath, scalarFunctions.size()); - UserFunctionDefinitions userFunctionDefinitions = - UserFunctionDefinitions.newBuilder() - .setJavaScalarFunctions(ImmutableMap.copyOf(scalarFunctions)) + JavaUdfDefinitions userFunctionDefinitions = + JavaUdfDefinitions.newBuilder() + .setScalarFunctions(ImmutableMap.copyOf(scalarFunctions)) .build(); cache.put(jarPath, userFunctionDefinitions); return userFunctionDefinitions; diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoaderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoaderTest.java similarity index 98% rename from sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoaderTest.java rename to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoaderTest.java index b68a872f45b3..c6e090176770 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoaderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoaderTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.zetasql.translation; +package org.apache.beam.sdk.extensions.sql.impl; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; diff --git a/sdks/java/extensions/sql/zetasql/udf-test-provider/build.gradle b/sdks/java/extensions/sql/udf-test-provider/build.gradle similarity index 95% rename from sdks/java/extensions/sql/zetasql/udf-test-provider/build.gradle rename to sdks/java/extensions/sql/udf-test-provider/build.gradle index b1a2c5877e4e..6769eba1d994 100644 --- a/sdks/java/extensions/sql/zetasql/udf-test-provider/build.gradle +++ b/sdks/java/extensions/sql/udf-test-provider/build.gradle @@ -20,9 +20,9 @@ plugins { id 'org.apache.beam.module' } -applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.sql.zetasql.provider') +applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.sql.provider') -description = "Apache Beam :: SDKs :: Java :: Extensions :: SQL :: ZetaSQL :: UDF test provider" +description = "Apache Beam :: SDKs :: Java :: Extensions :: SQL :: UDF test provider" ext.summary = "Java UDFs for testing. This project must be built separately from its parent so the UDF provider is not included in the context classloader for tests." project.ext.jarPath = jar.archivePath diff --git a/sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/UdfTestProvider.java b/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/UdfTestProvider.java similarity index 95% rename from sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/UdfTestProvider.java rename to sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/UdfTestProvider.java index 96c1a7670754..1879e0d80680 100644 --- a/sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/UdfTestProvider.java +++ b/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/UdfTestProvider.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.zetasql.provider; +package org.apache.beam.sdk.extensions.sql.provider; import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; diff --git a/sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/package-info.java b/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/package-info.java similarity index 93% rename from sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/package-info.java rename to sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/package-info.java index 8e222e327a44..0ca46e07eccd 100644 --- a/sdks/java/extensions/sql/zetasql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/provider/package-info.java +++ b/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/package-info.java @@ -17,4 +17,4 @@ */ /** Package containing UDF providers for testing. */ -package org.apache.beam.sdk.extensions.sql.zetasql.provider; +package org.apache.beam.sdk.extensions.sql.provider; diff --git a/sdks/java/extensions/sql/zetasql/build.gradle b/sdks/java/extensions/sql/zetasql/build.gradle index 137a92262ea7..9589301e90ca 100644 --- a/sdks/java/extensions/sql/zetasql/build.gradle +++ b/sdks/java/extensions/sql/zetasql/build.gradle @@ -33,7 +33,6 @@ dependencies { compile enforcedPlatform(library.java.google_cloud_platform_libraries_bom) compile project(":sdks:java:core") compile project(":sdks:java:extensions:sql") - compile project(":sdks:java:extensions:sql:udf") compile library.java.vendored_calcite_1_20_0 compile library.java.guava compile library.java.grpc_api @@ -52,18 +51,5 @@ dependencies { testCompile library.java.mockito_core testCompile library.java.quickcheck_core testRuntimeClasspath library.java.slf4j_jdk14 - testCompileOnly project(":sdks:java:extensions:sql:zetasql:udf-test-provider") } -task emptyJar(type: Jar) { - archiveBaseName = "${project.archivesBaseName}-empty-jar" - from fileTree(dir: getTemporaryDir().createNewFile().toString()) -} - -test { - dependsOn emptyJar - // Pass jars used by Java UDF tests via system properties. - evaluationDependsOn(":sdks:java:extensions:sql:zetasql:udf-test-provider") // Needed to resolve jarPath. - systemProperty "beam.sql.udf.test.jar_path", project(":sdks:java:extensions:sql:zetasql:udf-test-provider").jarPath - systemProperty "beam.sql.udf.test.empty_jar_path", emptyJar.archivePath -} diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/UserFunctionDefinitions.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/UserFunctionDefinitions.java index 32d07237b42f..395ad5e7a639 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/UserFunctionDefinitions.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/UserFunctionDefinitions.java @@ -21,7 +21,6 @@ import com.google.zetasql.resolvedast.ResolvedNode; import com.google.zetasql.resolvedast.ResolvedNodes; import java.util.List; -import org.apache.beam.sdk.extensions.sql.udf.ScalarFn; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** Holds user defined function definitions. */ @@ -37,8 +36,6 @@ public abstract class UserFunctionDefinitions { */ public abstract ImmutableMap, ResolvedNode> sqlTableValuedFunctions(); - public abstract ImmutableMap, ScalarFn> javaScalarFunctions(); - @AutoValue.Builder public abstract static class Builder { public abstract Builder setSqlScalarFunctions( @@ -47,16 +44,12 @@ public abstract Builder setSqlScalarFunctions( public abstract Builder setSqlTableValuedFunctions( ImmutableMap, ResolvedNode> sqlTableValuedFunctions); - public abstract Builder setJavaScalarFunctions( - ImmutableMap, ScalarFn> javaScalarFunctions); - public abstract UserFunctionDefinitions build(); } public static Builder newBuilder() { return new AutoValue_UserFunctionDefinitions.Builder() .setSqlScalarFunctions(ImmutableMap.of()) - .setSqlTableValuedFunctions(ImmutableMap.of()) - .setJavaScalarFunctions(ImmutableMap.of()); + .setSqlTableValuedFunctions(ImmutableMap.of()); } } diff --git a/settings.gradle b/settings.gradle index d5c9212ec1b4..44875a4552da 100644 --- a/settings.gradle +++ b/settings.gradle @@ -124,9 +124,9 @@ include ":sdks:java:extensions:sql:shell" include ":sdks:java:extensions:sql:hcatalog" include ":sdks:java:extensions:sql:datacatalog" include ":sdks:java:extensions:sql:zetasql" -include ":sdks:java:extensions:sql:zetasql:udf-test-provider" include ":sdks:java:extensions:sql:expansion-service" include ":sdks:java:extensions:sql:udf" +include ":sdks:java:extensions:sql:udf-test-provider" include ":sdks:java:extensions:zetasketch" include ":sdks:java:fn-execution" include ":sdks:java:harness" From ee4d14a3271ec3b1d512d7f30c7430246ebdf582 Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Mon, 28 Dec 2020 19:10:57 -0800 Subject: [PATCH 3/4] Make JavaUdfDefinitions a subclass of JavaUdfLoader. Since JavaUdfLoader should be the only consumer anyway. --- .../sql/impl/JavaUdfDefinitions.java | 40 ------------------- .../extensions/sql/impl/JavaUdfLoader.java | 29 +++++++++++--- 2 files changed, 24 insertions(+), 45 deletions(-) delete mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfDefinitions.java diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfDefinitions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfDefinitions.java deleted file mode 100644 index 0002babee374..000000000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfDefinitions.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.impl; - -import com.google.auto.value.AutoValue; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.udf.ScalarFn; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; - -/** Holds user defined function definitions. */ -@AutoValue -public abstract class JavaUdfDefinitions { - public abstract ImmutableMap, ScalarFn> scalarFunctions(); - - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setScalarFunctions(ImmutableMap, ScalarFn> value); - - public abstract JavaUdfDefinitions build(); - } - - public static Builder newBuilder() { - return new AutoValue_JavaUdfDefinitions.Builder().setScalarFunctions(ImmutableMap.of()); - } -} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java index 254c85d7203a..94430b5d460d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.impl; +import com.google.auto.value.AutoValue; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -57,7 +58,7 @@ public class JavaUdfLoader { * Maps the external jar location to the functions the jar defines. Static so it can persist * across multiple SQL transforms. */ - private static final Map cache = new HashMap<>(); + private static final Map cache = new HashMap<>(); private static final ClassLoader originalClassLoader = ReflectHelpers.findClassLoader(); @@ -70,7 +71,7 @@ public class JavaUdfLoader { public ScalarFn loadScalarFunction(List functionPath, String jarPath) { String functionFullName = String.join(".", functionPath); try { - JavaUdfDefinitions functionDefinitions = loadJar(jarPath); + FunctionDefinitions functionDefinitions = loadJar(jarPath); if (!functionDefinitions.scalarFunctions().containsKey(functionPath)) { throw new IllegalArgumentException( String.format( @@ -135,7 +136,7 @@ Iterator getUdfProviders(ClassLoader classLoader) throws IOExceptio return ServiceLoader.load(UdfProvider.class, classLoader).iterator(); } - private JavaUdfDefinitions loadJar(String jarPath) throws IOException { + private FunctionDefinitions loadJar(String jarPath) throws IOException { if (cache.containsKey(jarPath)) { LOG.debug("Using cached function definitions from {}", jarPath); return cache.get(jarPath); @@ -177,11 +178,29 @@ private JavaUdfDefinitions loadJar(String jarPath) throws IOException { UdfProvider.class.getSimpleName(), jarPath, scalarFunctions.size()); - JavaUdfDefinitions userFunctionDefinitions = - JavaUdfDefinitions.newBuilder() + FunctionDefinitions userFunctionDefinitions = + FunctionDefinitions.newBuilder() .setScalarFunctions(ImmutableMap.copyOf(scalarFunctions)) .build(); cache.put(jarPath, userFunctionDefinitions); return userFunctionDefinitions; } + + /** Holds user defined function definitions. */ + @AutoValue + abstract static class FunctionDefinitions { + abstract ImmutableMap, ScalarFn> scalarFunctions(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setScalarFunctions(ImmutableMap, ScalarFn> value); + + abstract FunctionDefinitions build(); + } + + static Builder newBuilder() { + return new AutoValue_JavaUdfLoader_FunctionDefinitions.Builder() + .setScalarFunctions(ImmutableMap.of()); + } + } } From 1fbf45c53824c94a1502f6cfa4f60299a4d1c030 Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Mon, 25 Jan 2021 13:14:54 -0800 Subject: [PATCH 4/4] Don't set context classloader. The classloader will need to be set in CalcFn#compile. We will do that in a subsequent PR. Also fixes nullability errors. --- .../extensions/sql/impl/JavaUdfLoader.java | 25 +++++-------------- .../sql/impl/JavaUdfLoaderTest.java | 21 +++++----------- 2 files changed, 12 insertions(+), 34 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java index 94430b5d460d..828a3973da87 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.extensions.sql.udf.UdfProvider; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.commons.codec.digest.DigestUtils; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; @@ -60,14 +59,7 @@ public class JavaUdfLoader { */ private static final Map cache = new HashMap<>(); - private static final ClassLoader originalClassLoader = ReflectHelpers.findClassLoader(); - - /** - * Load a user-defined scalar function from the specified jar. - * - *

WARNING: The first time a jar is loaded, it is added to the thread's - * context {@link ClassLoader} so that the jar can be staged by the runner. - */ + /** Load a user-defined scalar function from the specified jar. */ public ScalarFn loadScalarFunction(List functionPath, String jarPath) { String functionFullName = String.join(".", functionPath); try { @@ -119,16 +111,9 @@ private File downloadFile(String inputPath, String mimeType) throws IOException } } - private ClassLoader createAndSetClassLoader(String inputJarPath) throws IOException { + private ClassLoader createClassLoader(String inputJarPath) throws IOException { File tmpJar = downloadFile(inputJarPath, "application/java-archive"); - // Set the thread's context class loader so that the jar can be staged by the runner. - Thread.currentThread() - .setContextClassLoader( - new URLClassLoader( - new URL[] {tmpJar.toURI().toURL()}, ReflectHelpers.findClassLoader())); - // Return a class loader that isolates the target jar from other UDF jars that might have been - // loaded previously. - return new URLClassLoader(new URL[] {tmpJar.toURI().toURL()}, originalClassLoader); + return new URLClassLoader(new URL[] {tmpJar.toURI().toURL()}); } @VisibleForTesting @@ -142,8 +127,8 @@ private FunctionDefinitions loadJar(String jarPath) throws IOException { return cache.get(jarPath); } + ClassLoader classLoader = createClassLoader(jarPath); Map, ScalarFn> scalarFunctions = new HashMap<>(); - ClassLoader classLoader = createAndSetClassLoader(jarPath); Iterator providers = getUdfProviders(classLoader); int providersCount = 0; while (providers.hasNext()) { @@ -182,7 +167,9 @@ private FunctionDefinitions loadJar(String jarPath) throws IOException { FunctionDefinitions.newBuilder() .setScalarFunctions(ImmutableMap.copyOf(scalarFunctions)) .build(); + cache.put(jarPath, userFunctionDefinitions); + return userFunctionDefinitions; } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoaderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoaderTest.java index c6e090176770..8a4d9d2b0f70 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoaderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoaderTest.java @@ -26,7 +26,6 @@ import java.util.Iterator; import org.apache.beam.sdk.extensions.sql.udf.UdfProvider; import org.apache.beam.sdk.util.common.ReflectHelpers; -import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -42,20 +41,18 @@ public class JavaUdfLoaderTest { private final String jarPathProperty = "beam.sql.udf.test.jar_path"; private final String emptyJarPathProperty = "beam.sql.udf.test.empty_jar_path"; - private final @Nullable String jarPath = System.getProperty(jarPathProperty); - private final @Nullable String emptyJarPath = System.getProperty(emptyJarPathProperty); - - private final ClassLoader originalClassLoader = ReflectHelpers.findClassLoader(); + private final String jarPath = System.getProperty(jarPathProperty, ""); + private final String emptyJarPath = System.getProperty(emptyJarPathProperty, ""); @Before public void setUp() { - if (jarPath == null) { + if (jarPath.isEmpty()) { fail( String.format( "System property %s must be set to run %s.", jarPathProperty, JavaUdfLoaderTest.class.getSimpleName())); } - if (emptyJarPath == null) { + if (emptyJarPath.isEmpty()) { fail( String.format( "System property %s must be set to run %s.", @@ -70,21 +67,18 @@ public void setUp() { @Test public void testClassLoaderHasNoUdfProviders() throws IOException { JavaUdfLoader udfLoader = new JavaUdfLoader(); - Iterator udfProviders = udfLoader.getUdfProviders(originalClassLoader); + Iterator udfProviders = + udfLoader.getUdfProviders(ReflectHelpers.findClassLoader()); assertFalse(udfProviders.hasNext()); } @Test - @SuppressWarnings( - "nullness") // We check if jarPath is null in setUp, but the checker framework doesn't know. public void testLoadScalarFunction() { JavaUdfLoader udfLoader = new JavaUdfLoader(); udfLoader.loadScalarFunction(Collections.singletonList("helloWorld"), jarPath); } @Test - @SuppressWarnings( - "nullness") // We check if jarPath is null in setUp, but the checker framework doesn't know. public void testLoadUnregisteredScalarFunctionThrowsRuntimeException() { JavaUdfLoader udfLoader = new JavaUdfLoader(); thrown.expect(RuntimeException.class); @@ -94,9 +88,6 @@ public void testLoadUnregisteredScalarFunctionThrowsRuntimeException() { } @Test - @SuppressWarnings( - "nullness") // We check if jarPath and emptyJarPath are null in setUp, but the checker - // framework doesn't know. public void testJarMissingUdfProviderThrowsProviderNotFoundException() { JavaUdfLoader udfLoader = new JavaUdfLoader(); thrown.expect(ProviderNotFoundException.class);