diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle
index aa52d080fdf6..3b12082034ae 100644
--- a/sdks/java/extensions/sql/build.gradle
+++ b/sdks/java/extensions/sql/build.gradle
@@ -90,6 +90,7 @@ dependencies {
testCompile library.java.testcontainers_kafka
testCompile library.java.google_cloud_bigtable_emulator
testCompile project(path: ":sdks:java:io:mongodb", configuration: "testRuntime")
+ testCompileOnly project(":sdks:java:extensions:sql:udf-test-provider")
testRuntimeClasspath library.java.slf4j_jdk14
hadoopVersions.each {kv ->
"hadoopVersion$kv.key" "org.apache.hadoop:hadoop-client:$kv.value"
@@ -224,3 +225,16 @@ hadoopVersions.each { kv ->
include '**/*Test.class'
}
}
+
+task emptyJar(type: Jar) {
+ archiveBaseName = "${project.archivesBaseName}-empty-jar"
+ from fileTree(dir: getTemporaryDir().createNewFile().toString())
+}
+
+test {
+ dependsOn emptyJar
+ // Pass jars used by Java UDF tests via system properties.
+ evaluationDependsOn(":sdks:java:extensions:sql:udf-test-provider") // Needed to resolve jarPath.
+ systemProperty "beam.sql.udf.test.jar_path", project(":sdks:java:extensions:sql:udf-test-provider").jarPath
+ systemProperty "beam.sql.udf.test.empty_jar_path", emptyJar.archivePath
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java
new file mode 100644
index 000000000000..828a3973da87
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import com.google.auto.value.AutoValue;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.ProviderNotFoundException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.extensions.sql.udf.ScalarFn;
+import org.apache.beam.sdk.extensions.sql.udf.UdfProvider;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.commons.codec.digest.DigestUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Loads {@link UdfProvider} implementations from user-provided jars.
+ *
+ *
All UDFs are loaded and cached for each jar to mitigate IO costs.
+ */
+public class JavaUdfLoader {
+ private static final Logger LOG = LoggerFactory.getLogger(JavaUdfLoader.class);
+
+ /**
+ * Maps the external jar location to the functions the jar defines. Static so it can persist
+ * across multiple SQL transforms.
+ */
+ private static final Map cache = new HashMap<>();
+
+ /** Load a user-defined scalar function from the specified jar. */
+ public ScalarFn loadScalarFunction(List functionPath, String jarPath) {
+ String functionFullName = String.join(".", functionPath);
+ try {
+ FunctionDefinitions functionDefinitions = loadJar(jarPath);
+ if (!functionDefinitions.scalarFunctions().containsKey(functionPath)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "No implementation of scalar function %s found in %s.%n"
+ + " 1. Create a class implementing %s and annotate it with @AutoService(%s.class).%n"
+ + " 2. Add function %s to the class's userDefinedScalarFunctions implementation.",
+ functionFullName,
+ jarPath,
+ UdfProvider.class.getSimpleName(),
+ UdfProvider.class.getSimpleName(),
+ functionFullName));
+ }
+ return functionDefinitions.scalarFunctions().get(functionPath);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to load user-defined scalar function %s from %s", functionFullName, jarPath),
+ e);
+ }
+ }
+
+ /**
+ * Creates a temporary local copy of the file at {@code inputPath}, and returns a handle to the
+ * local copy.
+ */
+ private File downloadFile(String inputPath, String mimeType) throws IOException {
+ Preconditions.checkArgument(!inputPath.isEmpty(), "Path cannot be empty.");
+ ResourceId inputResource = FileSystems.matchNewResource(inputPath, false /* is directory */);
+ try (ReadableByteChannel inputChannel = FileSystems.open(inputResource)) {
+ File outputFile = File.createTempFile("sql-udf-", inputResource.getFilename());
+ ResourceId outputResource =
+ FileSystems.matchNewResource(outputFile.getAbsolutePath(), false /* is directory */);
+ try (WritableByteChannel outputChannel = FileSystems.create(outputResource, mimeType)) {
+ ByteStreams.copy(inputChannel, outputChannel);
+ }
+ // Compute and log checksum.
+ try (InputStream inputStream = new FileInputStream(outputFile)) {
+ LOG.info(
+ "Copied {} to {} with md5 hash {}.",
+ inputPath,
+ outputFile.getAbsolutePath(),
+ DigestUtils.md5Hex(inputStream));
+ }
+ return outputFile;
+ }
+ }
+
+ private ClassLoader createClassLoader(String inputJarPath) throws IOException {
+ File tmpJar = downloadFile(inputJarPath, "application/java-archive");
+ return new URLClassLoader(new URL[] {tmpJar.toURI().toURL()});
+ }
+
+ @VisibleForTesting
+ Iterator getUdfProviders(ClassLoader classLoader) throws IOException {
+ return ServiceLoader.load(UdfProvider.class, classLoader).iterator();
+ }
+
+ private FunctionDefinitions loadJar(String jarPath) throws IOException {
+ if (cache.containsKey(jarPath)) {
+ LOG.debug("Using cached function definitions from {}", jarPath);
+ return cache.get(jarPath);
+ }
+
+ ClassLoader classLoader = createClassLoader(jarPath);
+ Map, ScalarFn> scalarFunctions = new HashMap<>();
+ Iterator providers = getUdfProviders(classLoader);
+ int providersCount = 0;
+ while (providers.hasNext()) {
+ providersCount++;
+ UdfProvider provider = providers.next();
+ provider
+ .userDefinedScalarFunctions()
+ .forEach(
+ (functionName, implementation) -> {
+ List functionPath = ImmutableList.copyOf(functionName.split("\\."));
+ if (scalarFunctions.containsKey(functionPath)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Found multiple definitions of scalar function %s in %s.",
+ functionName, jarPath));
+ }
+ scalarFunctions.put(functionPath, implementation);
+ });
+ }
+ if (providersCount == 0) {
+ throw new ProviderNotFoundException(
+ String.format(
+ "No %s implementation found in %s. Create a class implementing %s and annotate it with @AutoService(%s.class).",
+ UdfProvider.class.getSimpleName(),
+ jarPath,
+ UdfProvider.class.getSimpleName(),
+ UdfProvider.class.getSimpleName()));
+ }
+ LOG.info(
+ "Loaded {} implementations of {} from {} with {} scalar function(s).",
+ providersCount,
+ UdfProvider.class.getSimpleName(),
+ jarPath,
+ scalarFunctions.size());
+ FunctionDefinitions userFunctionDefinitions =
+ FunctionDefinitions.newBuilder()
+ .setScalarFunctions(ImmutableMap.copyOf(scalarFunctions))
+ .build();
+
+ cache.put(jarPath, userFunctionDefinitions);
+
+ return userFunctionDefinitions;
+ }
+
+ /** Holds user defined function definitions. */
+ @AutoValue
+ abstract static class FunctionDefinitions {
+ abstract ImmutableMap, ScalarFn> scalarFunctions();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setScalarFunctions(ImmutableMap, ScalarFn> value);
+
+ abstract FunctionDefinitions build();
+ }
+
+ static Builder newBuilder() {
+ return new AutoValue_JavaUdfLoader_FunctionDefinitions.Builder()
+ .setScalarFunctions(ImmutableMap.of());
+ }
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoaderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoaderTest.java
new file mode 100644
index 000000000000..8a4d9d2b0f70
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoaderTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.file.ProviderNotFoundException;
+import java.util.Collections;
+import java.util.Iterator;
+import org.apache.beam.sdk.extensions.sql.udf.UdfProvider;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link JavaUdfLoader}. */
+@RunWith(JUnit4.class)
+public class JavaUdfLoaderTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private final String jarPathProperty = "beam.sql.udf.test.jar_path";
+ private final String emptyJarPathProperty = "beam.sql.udf.test.empty_jar_path";
+
+ private final String jarPath = System.getProperty(jarPathProperty, "");
+ private final String emptyJarPath = System.getProperty(emptyJarPathProperty, "");
+
+ @Before
+ public void setUp() {
+ if (jarPath.isEmpty()) {
+ fail(
+ String.format(
+ "System property %s must be set to run %s.",
+ jarPathProperty, JavaUdfLoaderTest.class.getSimpleName()));
+ }
+ if (emptyJarPath.isEmpty()) {
+ fail(
+ String.format(
+ "System property %s must be set to run %s.",
+ emptyJarPathProperty, JavaUdfLoaderTest.class.getSimpleName()));
+ }
+ }
+
+ /**
+ * Test that the parent classloader does not load any implementations of {@link UdfProvider}. This
+ * is important because we do not want to pollute the user's namespace.
+ */
+ @Test
+ public void testClassLoaderHasNoUdfProviders() throws IOException {
+ JavaUdfLoader udfLoader = new JavaUdfLoader();
+ Iterator udfProviders =
+ udfLoader.getUdfProviders(ReflectHelpers.findClassLoader());
+ assertFalse(udfProviders.hasNext());
+ }
+
+ @Test
+ public void testLoadScalarFunction() {
+ JavaUdfLoader udfLoader = new JavaUdfLoader();
+ udfLoader.loadScalarFunction(Collections.singletonList("helloWorld"), jarPath);
+ }
+
+ @Test
+ public void testLoadUnregisteredScalarFunctionThrowsRuntimeException() {
+ JavaUdfLoader udfLoader = new JavaUdfLoader();
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage(
+ String.format("No implementation of scalar function notRegistered found in %s.", jarPath));
+ udfLoader.loadScalarFunction(Collections.singletonList("notRegistered"), jarPath);
+ }
+
+ @Test
+ public void testJarMissingUdfProviderThrowsProviderNotFoundException() {
+ JavaUdfLoader udfLoader = new JavaUdfLoader();
+ thrown.expect(ProviderNotFoundException.class);
+ thrown.expectMessage(String.format("No UdfProvider implementation found in %s.", emptyJarPath));
+ // Load from an inhabited jar first so we can make sure we load UdfProviders in isolation
+ // from other jars.
+ udfLoader.loadScalarFunction(Collections.singletonList("helloWorld"), jarPath);
+ udfLoader.loadScalarFunction(Collections.singletonList("helloWorld"), emptyJarPath);
+ }
+}
diff --git a/sdks/java/extensions/sql/udf-test-provider/build.gradle b/sdks/java/extensions/sql/udf-test-provider/build.gradle
new file mode 100644
index 000000000000..6769eba1d994
--- /dev/null
+++ b/sdks/java/extensions/sql/udf-test-provider/build.gradle
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+ id 'org.apache.beam.module'
+}
+
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.sql.provider')
+
+description = "Apache Beam :: SDKs :: Java :: Extensions :: SQL :: UDF test provider"
+ext.summary = "Java UDFs for testing. This project must be built separately from its parent so the UDF provider is not included in the context classloader for tests."
+
+project.ext.jarPath = jar.archivePath
+
+dependencies {
+ // No dependency (direct or transitive) on :sdks:java:core.
+ compile project(":sdks:java:extensions:sql:udf")
+ compile library.java.guava
+}
diff --git a/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/UdfTestProvider.java b/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/UdfTestProvider.java
new file mode 100644
index 000000000000..1879e0d80680
--- /dev/null
+++ b/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/UdfTestProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.provider;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.udf.ScalarFn;
+import org.apache.beam.sdk.extensions.sql.udf.UdfProvider;
+
+/** Defines Java UDFs for use in tests. */
+@AutoService(UdfProvider.class)
+public class UdfTestProvider implements UdfProvider {
+ @Override
+ public Map userDefinedScalarFunctions() {
+ return ImmutableMap.of("helloWorld", new HelloWorldFn());
+ }
+
+ public static class HelloWorldFn extends ScalarFn {
+ @ApplyMethod
+ public String helloWorld() {
+ return "Hello world!";
+ }
+ }
+}
diff --git a/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/package-info.java b/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/package-info.java
new file mode 100644
index 000000000000..0ca46e07eccd
--- /dev/null
+++ b/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Package containing UDF providers for testing. */
+package org.apache.beam.sdk.extensions.sql.provider;
diff --git a/settings.gradle b/settings.gradle
index 8ccf0bc54676..44875a4552da 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -126,6 +126,7 @@ include ":sdks:java:extensions:sql:datacatalog"
include ":sdks:java:extensions:sql:zetasql"
include ":sdks:java:extensions:sql:expansion-service"
include ":sdks:java:extensions:sql:udf"
+include ":sdks:java:extensions:sql:udf-test-provider"
include ":sdks:java:extensions:zetasketch"
include ":sdks:java:fn-execution"
include ":sdks:java:harness"