Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .ci/helm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ function ci::verify_mesh_function() {
function ci::print_function_log() {
FUNCTION_NAME=$1
${KUBECTL} describe pod -lname=${FUNCTION_NAME}
sleep 120
${KUBECTL} logs -lname=${FUNCTION_NAME} --all-containers=true
}

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:

- name: InstallKubebuilder
run: |
curl -L https://go.kubebuilder.io/dl/2.3.1/linux/amd64 | tar -xz -C /tmp/
curl -L https://github.com/kubernetes-sigs/kubebuilder/releases/download/v2.3.1/kubebuilder_2.3.1_linux_amd64.tar.gz | tar -xz -C /tmp/
sudo mv /tmp/kubebuilder_2.3.1_linux_amd64 /usr/local/kubebuilder
export PATH=$PATH:/usr/local/kubebuilder/bin

Expand Down
2 changes: 1 addition & 1 deletion mesh-worker-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<version>v0.1.7-rc1</version>

<properties>
<pulsar.version>2.8.0.7</pulsar.version>
<pulsar.version>2.8.0.13</pulsar.version>
<lombok.version>1.18.16</lombok.version>
<log4j2.version>2.14.0</log4j2.version>
<kubernetes-client.version>12.0.1</kubernetes-client.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ public void registerFunction(final String tenant,
functionName,
functionPkgUrl,
functionConfig,
worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(),
cluster
cluster,
worker()
);
// override namespace by configuration file
v1alpha1Function.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
Expand Down Expand Up @@ -211,8 +211,8 @@ public void updateFunction(final String tenant,
functionName,
functionPkgUrl,
functionConfig,
worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(),
cluster
cluster,
worker()
);
Call getCall = worker().getCustomObjectsApi().getNamespacedCustomObjectCall(
group,
Expand Down Expand Up @@ -246,6 +246,7 @@ public void updateFunction(final String tenant,
executeCall(replaceCall, V1alpha1Function.class);
} catch (Exception e) {
log.error("update {}/{}/{} function failed, error message: {}", tenant, namespace, functionName, e);
e.printStackTrace();
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
Expand Down Expand Up @@ -611,4 +612,5 @@ private void upsertFunction(final String tenant,
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.functionmesh.compute.util;

import com.google.gson.Gson;
import io.functionmesh.compute.MeshWorkerService;
import io.functionmesh.compute.functions.models.V1alpha1Function;
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpec;
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecGolang;
Expand All @@ -36,18 +37,24 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.util.Strings;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;

import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand All @@ -66,7 +73,8 @@ public class FunctionsUtil {

public static V1alpha1Function createV1alpha1FunctionFromFunctionConfig(String kind, String group, String version
, String functionName, String functionPkgUrl, FunctionConfig functionConfig
, Map<String, Object> customConfigs, String cluster) {
, String cluster, MeshWorkerService worker) {
Map<String, Object> customConfigs = worker.getWorkerConfig().getFunctionsWorkerServiceCustomConfigs();
CustomRuntimeOptions customRuntimeOptions = CommonUtil.getCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions());
String clusterName = CommonUtil.getClusterName(cluster, customRuntimeOptions);

Expand Down Expand Up @@ -236,28 +244,70 @@ public static V1alpha1Function createV1alpha1FunctionFromFunctionConfig(String k
// v1alpha1FunctionSpecPulsar.setAuthConfig(CommonUtil.getPulsarClusterAuthConfigMapName(clusterName));
v1alpha1FunctionSpec.setPulsar(v1alpha1FunctionSpecPulsar);

String location = String.format("%s/%s/%s", functionConfig.getTenant(), functionConfig.getNamespace(),
functionName);
if (StringUtils.isNotEmpty(functionPkgUrl)) {
location = functionPkgUrl;
// TODO: dynamic file name to function CRD
String fileName = "/pulsar/function-executable";
boolean isPkgUrlProvided = StringUtils.isNotEmpty(functionPkgUrl);
File componentPackageFile = null;
try {
if (isPkgUrlProvided) {
if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
componentPackageFile = downloadPackageFile(worker, functionPkgUrl);
} else {
log.warn("get unsupported function package url {}", functionPkgUrl);
throw new IllegalArgumentException("Function Package url is not valid. supported url (function/sink/source)");
}
} else {
// TODO: support upload JAR to bk
throw new IllegalArgumentException("uploading package to mesh worker service is not supported yet.");
}
} catch (Exception e) {
log.error("Invalid register function request {}: {}", functionName, e);
e.printStackTrace();
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
Class<?>[] typeArgs = null;
if (componentPackageFile != null) {
typeArgs = extractTypeArgs(functionConfig, componentPackageFile);
}
if (StringUtils.isNotEmpty(functionConfig.getJar())) {
V1alpha1FunctionSpecJava v1alpha1FunctionSpecJava = new V1alpha1FunctionSpecJava();
Path path = Paths.get(functionConfig.getJar());
v1alpha1FunctionSpecJava.setJar(path.getFileName().toString());
v1alpha1FunctionSpecJava.setJarLocation(location);
v1alpha1FunctionSpecJava.setJar(fileName);
if (isPkgUrlProvided) {
v1alpha1FunctionSpecJava.setJarLocation(functionPkgUrl);
}
String extraDependenciesDir = "";
if (StringUtils.isNotEmpty(worker.getFactoryConfig().getExtraFunctionDependenciesDir())) {
if (Paths.get(worker.getFactoryConfig().getExtraFunctionDependenciesDir()).isAbsolute()) {
extraDependenciesDir = worker.getFactoryConfig().getExtraFunctionDependenciesDir();
} else {
extraDependenciesDir = "/pulsar/" + worker.getFactoryConfig().getExtraFunctionDependenciesDir();
}
} else {
extraDependenciesDir = "/pulsar/instances/deps";
}
v1alpha1FunctionSpecJava.setExtraDependenciesDir(extraDependenciesDir);
v1alpha1FunctionSpec.setJava(v1alpha1FunctionSpecJava);
if (typeArgs != null) {
if (typeArgs.length == 2 && typeArgs[0] != null) {
v1alpha1FunctionSpecInput.setTypeClassName(typeArgs[0].getName());
}
if (typeArgs.length == 2 && typeArgs[1] != null) {
v1alpha1FunctionSpecOutput.setTypeClassName(typeArgs[1].getName());
}
}
} else if (StringUtils.isNotEmpty(functionConfig.getPy())) {
V1alpha1FunctionSpecPython v1alpha1FunctionSpecPython = new V1alpha1FunctionSpecPython();
Path path = Paths.get(functionConfig.getPy());
v1alpha1FunctionSpecPython.setPy(path.getFileName().toString());
v1alpha1FunctionSpecPython.setPyLocation(location);
v1alpha1FunctionSpecPython.setPy(fileName);
if (isPkgUrlProvided) {
v1alpha1FunctionSpecPython.setPyLocation(functionPkgUrl);
}
v1alpha1FunctionSpec.setPython(v1alpha1FunctionSpecPython);
} else if (StringUtils.isNotEmpty(functionConfig.getGo())) {
V1alpha1FunctionSpecGolang v1alpha1FunctionSpecGolang = new V1alpha1FunctionSpecGolang();
Path path = Paths.get(functionConfig.getGo());
v1alpha1FunctionSpecGolang.setGo(path.getFileName().toString());
v1alpha1FunctionSpecGolang.setGoLocation(location);
v1alpha1FunctionSpecGolang.setGo(fileName);
if (isPkgUrlProvided) {
v1alpha1FunctionSpecGolang.setGoLocation(functionPkgUrl);
}
v1alpha1FunctionSpec.setGolang(v1alpha1FunctionSpecGolang);
}

Expand Down Expand Up @@ -422,12 +472,21 @@ public static FunctionConfig createFunctionConfigFromV1alpha1Function(String ten
if (v1alpha1FunctionSpec.getJava() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setJar(v1alpha1FunctionSpec.getJava().getJar());
if (Strings.isNotEmpty(v1alpha1FunctionSpec.getJava().getJarLocation())) {
functionConfig.setJar(v1alpha1FunctionSpec.getJava().getJarLocation());
}
} else if (v1alpha1FunctionSpec.getPython() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
functionConfig.setPy(v1alpha1FunctionSpec.getPython().getPy());
if (Strings.isNotEmpty(v1alpha1FunctionSpec.getPython().getPyLocation())) {
functionConfig.setJar(v1alpha1FunctionSpec.getPython().getPyLocation());
}
} else if (v1alpha1FunctionSpec.getGolang() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.GO);
functionConfig.setGo(v1alpha1FunctionSpec.getGolang().getGo());
if (Strings.isNotEmpty(v1alpha1FunctionSpec.getGolang().getGoLocation())) {
functionConfig.setJar(v1alpha1FunctionSpec.getGolang().getGoLocation());
}
}
if (v1alpha1FunctionSpec.getMaxMessageRetry() != null) {
functionConfig.setMaxMessageRetries(v1alpha1FunctionSpec.getMaxMessageRetry());
Expand Down Expand Up @@ -497,4 +556,35 @@ public static void convertFunctionStatusToInstanceStatusData(InstanceCommunicati
functionInstanceStatusData.setLastInvocationTime(functionStatus.getLastInvocationTime());
}

private static File downloadPackageFile(MeshWorkerService worker, String packageName) throws IOException, PulsarAdminException {
Path tempDirectory;
if (worker.getWorkerConfig().getDownloadDirectory() != null) {
tempDirectory = Paths.get(worker.getWorkerConfig().getDownloadDirectory());
} else {
// use the Nar extraction directory as a temporary directory for downloaded files
tempDirectory = Paths.get(worker.getWorkerConfig().getNarExtractionDirectory());
}
File file = Files.createTempFile(tempDirectory, "function", ".tmp").toFile();
worker.getBrokerAdmin().packages().download(packageName, file.toString());
return file;
}

private static Class<?>[] extractTypeArgs(final FunctionConfig functionConfig,
final File componentPackageFile) {
Class<?>[] typeArgs = null;
if (componentPackageFile == null) {
return null;
}
ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA && clsLoader != null) {
try {
typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
}
}
return typeArgs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -78,6 +79,7 @@
Response.class,
RealResponseBody.class,
CommonUtil.class,
FunctionsUtil.class,
InstanceControlGrpc.InstanceControlFutureStub.class})
@PowerMockIgnore({"javax.management.*"})
public class FunctionsImplTest {
Expand Down Expand Up @@ -316,9 +318,13 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
CustomObjectsApi customObjectsApi = PowerMockito.mock(CustomObjectsApi.class);
PowerMockito.when(meshWorkerService.getCustomObjectsApi()).thenReturn(customObjectsApi);
WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class);
KubernetesRuntimeFactoryConfig factoryConfig = PowerMockito.mock(KubernetesRuntimeFactoryConfig.class);
PowerMockito.when(meshWorkerService.getWorkerConfig()).thenReturn(workerConfig);
PowerMockito.when(meshWorkerService.getFactoryConfig()).thenReturn(factoryConfig);
PowerMockito.when(factoryConfig.getExtraFunctionDependenciesDir()).thenReturn("");
PowerMockito.when(workerConfig.isAuthorizationEnabled()).thenReturn(false);
PowerMockito.when(workerConfig.isAuthenticationEnabled()).thenReturn(false);
PowerMockito.when(workerConfig.getFunctionsWorkerServiceCustomConfigs()).thenReturn(Collections.emptyMap());
PulsarAdmin pulsarAdmin = PowerMockito.mock(PulsarAdmin.class);
PowerMockito.when(meshWorkerService.getBrokerAdmin()).thenReturn(pulsarAdmin);
Tenants tenants = PowerMockito.mock(Tenants.class);
Expand All @@ -327,6 +333,7 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
Response response = PowerMockito.mock(Response.class);
ResponseBody responseBody = PowerMockito.mock(RealResponseBody.class);
ApiClient apiClient = PowerMockito.mock(ApiClient.class);
PowerMockito.stub(PowerMockito.method(FunctionsUtil.class, "downloadPackageFile")).toReturn(null);

String tenant = "public";
String namespace = "default";
Expand All @@ -336,12 +343,12 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
String version = "v1alpha1";
String kind = "Function";

FunctionConfig functionConfig = Generate.CreateJavaFunctionConfig(tenant, namespace, functionName);
FunctionConfig functionConfig = Generate.CreateJavaFunctionWithPackageURLConfig(tenant, namespace, functionName);

PowerMockito.when(tenants.getTenantInfo(tenant)).thenReturn(null);

V1alpha1Function v1alpha1Function = FunctionsUtil.createV1alpha1FunctionFromFunctionConfig(kind, group,
version, functionName, null, functionConfig, Collections.emptyMap(), null);
version, functionName, functionConfig.getJar(), functionConfig, null, meshWorkerService);

String clusterName = "test-pulsar";
Map<String, String> customLabels = Maps.newHashMap();
Expand Down Expand Up @@ -380,7 +387,7 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
functionName,
null,
null,
null,
functionConfig.getJar(),
functionConfig,
null,
null);
Expand Down Expand Up @@ -507,6 +514,9 @@ public void updateFunctionTest() throws ApiException, IOException {
PowerMockito.when(meshWorkerService.getWorkerConfig()).thenReturn(workerConfig);
PowerMockito.when(workerConfig.isAuthorizationEnabled()).thenReturn(false);
PowerMockito.when(workerConfig.isAuthenticationEnabled()).thenReturn(false);
KubernetesRuntimeFactoryConfig factoryConfig = PowerMockito.mock(KubernetesRuntimeFactoryConfig.class);
PowerMockito.when(meshWorkerService.getFactoryConfig()).thenReturn(factoryConfig);
PowerMockito.when(factoryConfig.getExtraFunctionDependenciesDir()).thenReturn("");

Call getCall = PowerMockito.mock(Call.class);
Response getResponse = PowerMockito.mock(Response.class);
Expand Down Expand Up @@ -540,7 +550,9 @@ public void updateFunctionTest() throws ApiException, IOException {
null
)).thenReturn(getCall);

FunctionConfig functionConfig = Generate.CreateJavaFunctionConfig(tenant, namespace, functionName);
PowerMockito.stub(PowerMockito.method(FunctionsUtil.class, "downloadPackageFile")).toReturn(null);

FunctionConfig functionConfig = Generate.CreateJavaFunctionWithPackageURLConfig(tenant, namespace, functionName);

PowerMockito.when(meshWorkerService.getCustomObjectsApi()
.replaceNamespacedCustomObjectCall(
Expand All @@ -564,12 +576,13 @@ public void updateFunctionTest() throws ApiException, IOException {
functionName,
null,
null,
null,
functionConfig.getJar(),
functionConfig,
null,
null,
null);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail("Expected no exception to be thrown but got exception: " + exception);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,31 @@ public static FunctionConfig CreateJavaFunctionConfig(String tenant, String name
return functionConfig;
}

public static FunctionConfig CreateJavaFunctionWithPackageURLConfig(String tenant, String namespace, String functionName) {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName(functionName);
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setClassName("org.example.functions.WordCountFunction");
functionConfig.setInputs(Collections.singletonList("persistent://public/default/sentences"));
functionConfig.setParallelism(1);
functionConfig.setCleanupSubscription(true);
functionConfig.setOutput("persistent://public/default/count");
Resources resources = new Resources();
resources.setCpu(1.0);
resources.setRam(102400L);
functionConfig.setResources(resources);
CustomRuntimeOptions customRuntimeOptions = new CustomRuntimeOptions();
customRuntimeOptions.setClusterName(TEST_CLUSTER_NAME);
customRuntimeOptions.setInputTypeClassName("java.lang.String");
customRuntimeOptions.setOutputTypeClassName("java.lang.String");
String customRuntimeOptionsJSON = new Gson().toJson(customRuntimeOptions, CustomRuntimeOptions.class);
functionConfig.setCustomRuntimeOptions(customRuntimeOptionsJSON);
functionConfig.setJar(String.format("function://public/default/%s@1.0", functionName));
functionConfig.setAutoAck(true);
return functionConfig;
}

public static SinkConfig CreateSinkConfig(String tenant, String namespace, String functionName) {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setName(functionName);
Expand Down
Loading