diff --git a/.ci/helm.sh b/.ci/helm.sh
index 797973b27..6084974e0 100644
--- a/.ci/helm.sh
+++ b/.ci/helm.sh
@@ -234,6 +234,7 @@ function ci::verify_mesh_function() {
function ci::print_function_log() {
FUNCTION_NAME=$1
${KUBECTL} describe pod -lname=${FUNCTION_NAME}
+ sleep 120
${KUBECTL} logs -lname=${FUNCTION_NAME} --all-containers=true
}
diff --git a/.github/workflows/project.yml b/.github/workflows/project.yml
index 917b4eac3..7e1a19759 100644
--- a/.github/workflows/project.yml
+++ b/.github/workflows/project.yml
@@ -27,7 +27,7 @@ jobs:
- name: InstallKubebuilder
run: |
- curl -L https://go.kubebuilder.io/dl/2.3.1/linux/amd64 | tar -xz -C /tmp/
+ curl -L https://github.com/kubernetes-sigs/kubebuilder/releases/download/v2.3.1/kubebuilder_2.3.1_linux_amd64.tar.gz | tar -xz -C /tmp/
sudo mv /tmp/kubebuilder_2.3.1_linux_amd64 /usr/local/kubebuilder
export PATH=$PATH:/usr/local/kubebuilder/bin
diff --git a/mesh-worker-service/pom.xml b/mesh-worker-service/pom.xml
index d7c51ce6b..c52d7f0d0 100644
--- a/mesh-worker-service/pom.xml
+++ b/mesh-worker-service/pom.xml
@@ -29,7 +29,7 @@
v0.1.7-rc1
- 2.8.0.7
+ 2.8.0.13
1.18.16
2.14.0
12.0.1
diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java
index d40509cbc..88b64f4a6 100644
--- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java
+++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java
@@ -142,8 +142,8 @@ public void registerFunction(final String tenant,
functionName,
functionPkgUrl,
functionConfig,
- worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(),
- cluster
+ cluster,
+ worker()
);
// override namespace by configuration file
v1alpha1Function.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
@@ -211,8 +211,8 @@ public void updateFunction(final String tenant,
functionName,
functionPkgUrl,
functionConfig,
- worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(),
- cluster
+ cluster,
+ worker()
);
Call getCall = worker().getCustomObjectsApi().getNamespacedCustomObjectCall(
group,
@@ -246,6 +246,7 @@ public void updateFunction(final String tenant,
executeCall(replaceCall, V1alpha1Function.class);
} catch (Exception e) {
log.error("update {}/{}/{} function failed, error message: {}", tenant, namespace, functionName, e);
+ e.printStackTrace();
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
@@ -611,4 +612,5 @@ private void upsertFunction(final String tenant,
}
}
}
+
}
diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java
index c6e35bffe..08da59231 100644
--- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java
+++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java
@@ -19,6 +19,7 @@
package io.functionmesh.compute.util;
import com.google.gson.Gson;
+import io.functionmesh.compute.MeshWorkerService;
import io.functionmesh.compute.functions.models.V1alpha1Function;
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpec;
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecGolang;
@@ -36,18 +37,24 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.util.Strings;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -66,7 +73,8 @@ public class FunctionsUtil {
public static V1alpha1Function createV1alpha1FunctionFromFunctionConfig(String kind, String group, String version
, String functionName, String functionPkgUrl, FunctionConfig functionConfig
- , Map customConfigs, String cluster) {
+ , String cluster, MeshWorkerService worker) {
+ Map customConfigs = worker.getWorkerConfig().getFunctionsWorkerServiceCustomConfigs();
CustomRuntimeOptions customRuntimeOptions = CommonUtil.getCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions());
String clusterName = CommonUtil.getClusterName(cluster, customRuntimeOptions);
@@ -236,28 +244,70 @@ public static V1alpha1Function createV1alpha1FunctionFromFunctionConfig(String k
// v1alpha1FunctionSpecPulsar.setAuthConfig(CommonUtil.getPulsarClusterAuthConfigMapName(clusterName));
v1alpha1FunctionSpec.setPulsar(v1alpha1FunctionSpecPulsar);
- String location = String.format("%s/%s/%s", functionConfig.getTenant(), functionConfig.getNamespace(),
- functionName);
- if (StringUtils.isNotEmpty(functionPkgUrl)) {
- location = functionPkgUrl;
+ // TODO: dynamic file name to function CRD
+ String fileName = "/pulsar/function-executable";
+ boolean isPkgUrlProvided = StringUtils.isNotEmpty(functionPkgUrl);
+ File componentPackageFile = null;
+ try {
+ if (isPkgUrlProvided) {
+ if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
+ componentPackageFile = downloadPackageFile(worker, functionPkgUrl);
+ } else {
+ log.warn("get unsupported function package url {}", functionPkgUrl);
+ throw new IllegalArgumentException("Function Package url is not valid. supported url (function/sink/source)");
+ }
+ } else {
+ // TODO: support upload JAR to bk
+ throw new IllegalArgumentException("uploading package to mesh worker service is not supported yet.");
+ }
+ } catch (Exception e) {
+ log.error("Invalid register function request {}: {}", functionName, e);
+ e.printStackTrace();
+ throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+ }
+ Class>[] typeArgs = null;
+ if (componentPackageFile != null) {
+ typeArgs = extractTypeArgs(functionConfig, componentPackageFile);
}
if (StringUtils.isNotEmpty(functionConfig.getJar())) {
V1alpha1FunctionSpecJava v1alpha1FunctionSpecJava = new V1alpha1FunctionSpecJava();
- Path path = Paths.get(functionConfig.getJar());
- v1alpha1FunctionSpecJava.setJar(path.getFileName().toString());
- v1alpha1FunctionSpecJava.setJarLocation(location);
+ v1alpha1FunctionSpecJava.setJar(fileName);
+ if (isPkgUrlProvided) {
+ v1alpha1FunctionSpecJava.setJarLocation(functionPkgUrl);
+ }
+ String extraDependenciesDir = "";
+ if (StringUtils.isNotEmpty(worker.getFactoryConfig().getExtraFunctionDependenciesDir())) {
+ if (Paths.get(worker.getFactoryConfig().getExtraFunctionDependenciesDir()).isAbsolute()) {
+ extraDependenciesDir = worker.getFactoryConfig().getExtraFunctionDependenciesDir();
+ } else {
+ extraDependenciesDir = "/pulsar/" + worker.getFactoryConfig().getExtraFunctionDependenciesDir();
+ }
+ } else {
+ extraDependenciesDir = "/pulsar/instances/deps";
+ }
+ v1alpha1FunctionSpecJava.setExtraDependenciesDir(extraDependenciesDir);
v1alpha1FunctionSpec.setJava(v1alpha1FunctionSpecJava);
+ if (typeArgs != null) {
+ if (typeArgs.length == 2 && typeArgs[0] != null) {
+ v1alpha1FunctionSpecInput.setTypeClassName(typeArgs[0].getName());
+ }
+ if (typeArgs.length == 2 && typeArgs[1] != null) {
+ v1alpha1FunctionSpecOutput.setTypeClassName(typeArgs[1].getName());
+ }
+ }
} else if (StringUtils.isNotEmpty(functionConfig.getPy())) {
V1alpha1FunctionSpecPython v1alpha1FunctionSpecPython = new V1alpha1FunctionSpecPython();
- Path path = Paths.get(functionConfig.getPy());
- v1alpha1FunctionSpecPython.setPy(path.getFileName().toString());
- v1alpha1FunctionSpecPython.setPyLocation(location);
+ v1alpha1FunctionSpecPython.setPy(fileName);
+ if (isPkgUrlProvided) {
+ v1alpha1FunctionSpecPython.setPyLocation(functionPkgUrl);
+ }
v1alpha1FunctionSpec.setPython(v1alpha1FunctionSpecPython);
} else if (StringUtils.isNotEmpty(functionConfig.getGo())) {
V1alpha1FunctionSpecGolang v1alpha1FunctionSpecGolang = new V1alpha1FunctionSpecGolang();
- Path path = Paths.get(functionConfig.getGo());
- v1alpha1FunctionSpecGolang.setGo(path.getFileName().toString());
- v1alpha1FunctionSpecGolang.setGoLocation(location);
+ v1alpha1FunctionSpecGolang.setGo(fileName);
+ if (isPkgUrlProvided) {
+ v1alpha1FunctionSpecGolang.setGoLocation(functionPkgUrl);
+ }
v1alpha1FunctionSpec.setGolang(v1alpha1FunctionSpecGolang);
}
@@ -422,12 +472,21 @@ public static FunctionConfig createFunctionConfigFromV1alpha1Function(String ten
if (v1alpha1FunctionSpec.getJava() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setJar(v1alpha1FunctionSpec.getJava().getJar());
+ if (Strings.isNotEmpty(v1alpha1FunctionSpec.getJava().getJarLocation())) {
+ functionConfig.setJar(v1alpha1FunctionSpec.getJava().getJarLocation());
+ }
} else if (v1alpha1FunctionSpec.getPython() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
functionConfig.setPy(v1alpha1FunctionSpec.getPython().getPy());
+ if (Strings.isNotEmpty(v1alpha1FunctionSpec.getPython().getPyLocation())) {
+ functionConfig.setJar(v1alpha1FunctionSpec.getPython().getPyLocation());
+ }
} else if (v1alpha1FunctionSpec.getGolang() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.GO);
functionConfig.setGo(v1alpha1FunctionSpec.getGolang().getGo());
+ if (Strings.isNotEmpty(v1alpha1FunctionSpec.getGolang().getGoLocation())) {
+ functionConfig.setJar(v1alpha1FunctionSpec.getGolang().getGoLocation());
+ }
}
if (v1alpha1FunctionSpec.getMaxMessageRetry() != null) {
functionConfig.setMaxMessageRetries(v1alpha1FunctionSpec.getMaxMessageRetry());
@@ -497,4 +556,35 @@ public static void convertFunctionStatusToInstanceStatusData(InstanceCommunicati
functionInstanceStatusData.setLastInvocationTime(functionStatus.getLastInvocationTime());
}
+ private static File downloadPackageFile(MeshWorkerService worker, String packageName) throws IOException, PulsarAdminException {
+ Path tempDirectory;
+ if (worker.getWorkerConfig().getDownloadDirectory() != null) {
+ tempDirectory = Paths.get(worker.getWorkerConfig().getDownloadDirectory());
+ } else {
+ // use the Nar extraction directory as a temporary directory for downloaded files
+ tempDirectory = Paths.get(worker.getWorkerConfig().getNarExtractionDirectory());
+ }
+ File file = Files.createTempFile(tempDirectory, "function", ".tmp").toFile();
+ worker.getBrokerAdmin().packages().download(packageName, file.toString());
+ return file;
+ }
+
+ private static Class>[] extractTypeArgs(final FunctionConfig functionConfig,
+ final File componentPackageFile) {
+ Class>[] typeArgs = null;
+ if (componentPackageFile == null) {
+ return null;
+ }
+ ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA && clsLoader != null) {
+ try {
+ typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader);
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ throw new IllegalArgumentException(
+ String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
+ }
+ }
+ return typeArgs;
+ }
+
}
diff --git a/mesh-worker-service/src/test/java/io/functionmesh/compute/rest/api/FunctionsImplTest.java b/mesh-worker-service/src/test/java/io/functionmesh/compute/rest/api/FunctionsImplTest.java
index 78f158445..3d77f5cf8 100644
--- a/mesh-worker-service/src/test/java/io/functionmesh/compute/rest/api/FunctionsImplTest.java
+++ b/mesh-worker-service/src/test/java/io/functionmesh/compute/rest/api/FunctionsImplTest.java
@@ -51,6 +51,7 @@
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.junit.Assert;
import org.junit.Test;
@@ -78,6 +79,7 @@
Response.class,
RealResponseBody.class,
CommonUtil.class,
+ FunctionsUtil.class,
InstanceControlGrpc.InstanceControlFutureStub.class})
@PowerMockIgnore({"javax.management.*"})
public class FunctionsImplTest {
@@ -316,9 +318,13 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
CustomObjectsApi customObjectsApi = PowerMockito.mock(CustomObjectsApi.class);
PowerMockito.when(meshWorkerService.getCustomObjectsApi()).thenReturn(customObjectsApi);
WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class);
+ KubernetesRuntimeFactoryConfig factoryConfig = PowerMockito.mock(KubernetesRuntimeFactoryConfig.class);
PowerMockito.when(meshWorkerService.getWorkerConfig()).thenReturn(workerConfig);
+ PowerMockito.when(meshWorkerService.getFactoryConfig()).thenReturn(factoryConfig);
+ PowerMockito.when(factoryConfig.getExtraFunctionDependenciesDir()).thenReturn("");
PowerMockito.when(workerConfig.isAuthorizationEnabled()).thenReturn(false);
PowerMockito.when(workerConfig.isAuthenticationEnabled()).thenReturn(false);
+ PowerMockito.when(workerConfig.getFunctionsWorkerServiceCustomConfigs()).thenReturn(Collections.emptyMap());
PulsarAdmin pulsarAdmin = PowerMockito.mock(PulsarAdmin.class);
PowerMockito.when(meshWorkerService.getBrokerAdmin()).thenReturn(pulsarAdmin);
Tenants tenants = PowerMockito.mock(Tenants.class);
@@ -327,6 +333,7 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
Response response = PowerMockito.mock(Response.class);
ResponseBody responseBody = PowerMockito.mock(RealResponseBody.class);
ApiClient apiClient = PowerMockito.mock(ApiClient.class);
+ PowerMockito.stub(PowerMockito.method(FunctionsUtil.class, "downloadPackageFile")).toReturn(null);
String tenant = "public";
String namespace = "default";
@@ -336,12 +343,12 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
String version = "v1alpha1";
String kind = "Function";
- FunctionConfig functionConfig = Generate.CreateJavaFunctionConfig(tenant, namespace, functionName);
+ FunctionConfig functionConfig = Generate.CreateJavaFunctionWithPackageURLConfig(tenant, namespace, functionName);
PowerMockito.when(tenants.getTenantInfo(tenant)).thenReturn(null);
V1alpha1Function v1alpha1Function = FunctionsUtil.createV1alpha1FunctionFromFunctionConfig(kind, group,
- version, functionName, null, functionConfig, Collections.emptyMap(), null);
+ version, functionName, functionConfig.getJar(), functionConfig, null, meshWorkerService);
String clusterName = "test-pulsar";
Map customLabels = Maps.newHashMap();
@@ -380,7 +387,7 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
functionName,
null,
null,
- null,
+ functionConfig.getJar(),
functionConfig,
null,
null);
@@ -507,6 +514,9 @@ public void updateFunctionTest() throws ApiException, IOException {
PowerMockito.when(meshWorkerService.getWorkerConfig()).thenReturn(workerConfig);
PowerMockito.when(workerConfig.isAuthorizationEnabled()).thenReturn(false);
PowerMockito.when(workerConfig.isAuthenticationEnabled()).thenReturn(false);
+ KubernetesRuntimeFactoryConfig factoryConfig = PowerMockito.mock(KubernetesRuntimeFactoryConfig.class);
+ PowerMockito.when(meshWorkerService.getFactoryConfig()).thenReturn(factoryConfig);
+ PowerMockito.when(factoryConfig.getExtraFunctionDependenciesDir()).thenReturn("");
Call getCall = PowerMockito.mock(Call.class);
Response getResponse = PowerMockito.mock(Response.class);
@@ -540,7 +550,9 @@ public void updateFunctionTest() throws ApiException, IOException {
null
)).thenReturn(getCall);
- FunctionConfig functionConfig = Generate.CreateJavaFunctionConfig(tenant, namespace, functionName);
+ PowerMockito.stub(PowerMockito.method(FunctionsUtil.class, "downloadPackageFile")).toReturn(null);
+
+ FunctionConfig functionConfig = Generate.CreateJavaFunctionWithPackageURLConfig(tenant, namespace, functionName);
PowerMockito.when(meshWorkerService.getCustomObjectsApi()
.replaceNamespacedCustomObjectCall(
@@ -564,12 +576,13 @@ public void updateFunctionTest() throws ApiException, IOException {
functionName,
null,
null,
- null,
+ functionConfig.getJar(),
functionConfig,
null,
null,
null);
} catch (Exception exception) {
+ exception.printStackTrace();
Assert.fail("Expected no exception to be thrown but got exception: " + exception);
}
}
diff --git a/mesh-worker-service/src/test/java/io/functionmesh/compute/testdata/Generate.java b/mesh-worker-service/src/test/java/io/functionmesh/compute/testdata/Generate.java
index 7540a3130..d199e6fc1 100644
--- a/mesh-worker-service/src/test/java/io/functionmesh/compute/testdata/Generate.java
+++ b/mesh-worker-service/src/test/java/io/functionmesh/compute/testdata/Generate.java
@@ -59,6 +59,31 @@ public static FunctionConfig CreateJavaFunctionConfig(String tenant, String name
return functionConfig;
}
+ public static FunctionConfig CreateJavaFunctionWithPackageURLConfig(String tenant, String namespace, String functionName) {
+ FunctionConfig functionConfig = new FunctionConfig();
+ functionConfig.setName(functionName);
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespace);
+ functionConfig.setClassName("org.example.functions.WordCountFunction");
+ functionConfig.setInputs(Collections.singletonList("persistent://public/default/sentences"));
+ functionConfig.setParallelism(1);
+ functionConfig.setCleanupSubscription(true);
+ functionConfig.setOutput("persistent://public/default/count");
+ Resources resources = new Resources();
+ resources.setCpu(1.0);
+ resources.setRam(102400L);
+ functionConfig.setResources(resources);
+ CustomRuntimeOptions customRuntimeOptions = new CustomRuntimeOptions();
+ customRuntimeOptions.setClusterName(TEST_CLUSTER_NAME);
+ customRuntimeOptions.setInputTypeClassName("java.lang.String");
+ customRuntimeOptions.setOutputTypeClassName("java.lang.String");
+ String customRuntimeOptionsJSON = new Gson().toJson(customRuntimeOptions, CustomRuntimeOptions.class);
+ functionConfig.setCustomRuntimeOptions(customRuntimeOptionsJSON);
+ functionConfig.setJar(String.format("function://public/default/%s@1.0", functionName));
+ functionConfig.setAutoAck(true);
+ return functionConfig;
+ }
+
public static SinkConfig CreateSinkConfig(String tenant, String namespace, String functionName) {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setName(functionName);
diff --git a/mesh-worker-service/src/test/java/io/functionmesh/compute/util/FunctionsUtilTest.java b/mesh-worker-service/src/test/java/io/functionmesh/compute/util/FunctionsUtilTest.java
index b56213e98..d173ed9c6 100644
--- a/mesh-worker-service/src/test/java/io/functionmesh/compute/util/FunctionsUtilTest.java
+++ b/mesh-worker-service/src/test/java/io/functionmesh/compute/util/FunctionsUtilTest.java
@@ -18,14 +18,37 @@
*/
package io.functionmesh.compute.util;
+import io.functionmesh.compute.MeshWorkerService;
import io.functionmesh.compute.functions.models.V1alpha1Function;
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpec;
import io.functionmesh.compute.testdata.Generate;
+
import java.util.Collections;
+
+import io.kubernetes.client.openapi.apis.CustomObjectsApi;
+import okhttp3.Response;
+import okhttp3.internal.http.RealResponseBody;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
+import org.apache.pulsar.functions.worker.WorkerConfig;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+ Response.class,
+ RealResponseBody.class,
+ CommonUtil.class,
+ FunctionsUtil.class,
+ InstanceControlGrpc.InstanceControlFutureStub.class})
+@PowerMockIgnore({"javax.management.*"})
public class FunctionsUtilTest {
@Test
public void testCreateV1alpha1FunctionFromFunctionConfig() {
@@ -41,12 +64,28 @@ public void testCreateV1alpha1FunctionFromFunctionConfig() {
String input = "persistent://public/default/sentences";
String output = "persistent://public/default/count";
String clusterName = "test-pulsar";
- String jar = "word-count.jar";
+ String jar = "/pulsar/function-executable";
+
+ MeshWorkerService meshWorkerService = PowerMockito.mock(MeshWorkerService.class);
+ CustomObjectsApi customObjectsApi = PowerMockito.mock(CustomObjectsApi.class);
+ PowerMockito.when(meshWorkerService.getCustomObjectsApi()).thenReturn(customObjectsApi);
+ WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class);
+ KubernetesRuntimeFactoryConfig factoryConfig = PowerMockito.mock(KubernetesRuntimeFactoryConfig.class);
+ PowerMockito.when(meshWorkerService.getWorkerConfig()).thenReturn(workerConfig);
+ PowerMockito.when(meshWorkerService.getFactoryConfig()).thenReturn(factoryConfig);
+ PowerMockito.when(factoryConfig.getExtraFunctionDependenciesDir()).thenReturn("");
+ PowerMockito.when(workerConfig.isAuthorizationEnabled()).thenReturn(false);
+ PowerMockito.when(workerConfig.isAuthenticationEnabled()).thenReturn(false);
+ PowerMockito.when(workerConfig.getFunctionsWorkerServiceCustomConfigs()).thenReturn(Collections.emptyMap());
+ PulsarAdmin pulsarAdmin = PowerMockito.mock(PulsarAdmin.class);
+ PowerMockito.when(meshWorkerService.getBrokerAdmin()).thenReturn(pulsarAdmin);
+ PowerMockito.stub(PowerMockito.method(FunctionsUtil.class, "downloadPackageFile")).toReturn(null);
- FunctionConfig functionConfig = Generate.CreateJavaFunctionConfig(tenant, namespace, functionName);
+
+ FunctionConfig functionConfig = Generate.CreateJavaFunctionWithPackageURLConfig(tenant, namespace, functionName);
V1alpha1Function v1alpha1Function = FunctionsUtil.createV1alpha1FunctionFromFunctionConfig(kind, group, version,
- functionName, null, functionConfig, Collections.emptyMap(), null);
+ functionName, functionConfig.getJar(), functionConfig, null, meshWorkerService);
Assert.assertEquals(v1alpha1Function.getKind(), kind);
@@ -73,10 +112,25 @@ public void testCreateFunctionConfigFromV1alpha1Function() {
String version = "v1alpha1";
String kind = "Function";
- FunctionConfig functionConfig = Generate.CreateJavaFunctionConfig(tenant, namespace, functionName);
+ MeshWorkerService meshWorkerService = PowerMockito.mock(MeshWorkerService.class);
+ CustomObjectsApi customObjectsApi = PowerMockito.mock(CustomObjectsApi.class);
+ PowerMockito.when(meshWorkerService.getCustomObjectsApi()).thenReturn(customObjectsApi);
+ WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class);
+ KubernetesRuntimeFactoryConfig factoryConfig = PowerMockito.mock(KubernetesRuntimeFactoryConfig.class);
+ PowerMockito.when(meshWorkerService.getWorkerConfig()).thenReturn(workerConfig);
+ PowerMockito.when(meshWorkerService.getFactoryConfig()).thenReturn(factoryConfig);
+ PowerMockito.when(factoryConfig.getExtraFunctionDependenciesDir()).thenReturn("");
+ PowerMockito.when(workerConfig.isAuthorizationEnabled()).thenReturn(false);
+ PowerMockito.when(workerConfig.isAuthenticationEnabled()).thenReturn(false);
+ PowerMockito.when(workerConfig.getFunctionsWorkerServiceCustomConfigs()).thenReturn(Collections.emptyMap());
+ PulsarAdmin pulsarAdmin = PowerMockito.mock(PulsarAdmin.class);
+ PowerMockito.when(meshWorkerService.getBrokerAdmin()).thenReturn(pulsarAdmin);
+ PowerMockito.stub(PowerMockito.method(FunctionsUtil.class, "downloadPackageFile")).toReturn(null);
+
+ FunctionConfig functionConfig = Generate.CreateJavaFunctionWithPackageURLConfig(tenant, namespace, functionName);
V1alpha1Function v1alpha1Function = FunctionsUtil.createV1alpha1FunctionFromFunctionConfig(kind, group, version,
- functionName, null, functionConfig, Collections.emptyMap(), null);
+ functionName, functionConfig.getJar(), functionConfig, null, meshWorkerService);
FunctionConfig newFunctionConfig = FunctionsUtil.createFunctionConfigFromV1alpha1Function(tenant, namespace,
functionName, v1alpha1Function);