diff --git a/mesh-worker-service/pom.xml b/mesh-worker-service/pom.xml
index d2d345703..9b54016f1 100644
--- a/mesh-worker-service/pom.xml
+++ b/mesh-worker-service/pom.xml
@@ -29,7 +29,7 @@
v0.1.5-SNAPSHOT
- 2.8.0-rc-202103292206
+ 2.8.0-rc-202105140121
1.18.16
2.14.0
10.0.1
@@ -42,7 +42,7 @@
- org.apache.pulsar
+ io.streamnative
pulsar-functions-worker
${pulsar.version}
diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/MeshWorkerService.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/MeshWorkerService.java
index 3bb3bd76f..20584fea9 100644
--- a/mesh-worker-service/src/main/java/io/functionmesh/compute/MeshWorkerService.java
+++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/MeshWorkerService.java
@@ -28,7 +28,6 @@
import io.kubernetes.client.util.Config;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -38,11 +37,8 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.apache.pulsar.functions.auth.FunctionAuthProvider;
-import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
-import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -76,7 +72,6 @@ public class MeshWorkerService implements WorkerService {
private CustomObjectsApi customObjectsApi;
private ApiClient apiClient;
private PulsarAdmin brokerAdmin;
- private Optional authProvider;
private KubernetesRuntimeFactoryConfig factoryConfig;
private AuthenticationService authenticationService;
@@ -138,13 +133,6 @@ public void init(WorkerConfig workerConfig) throws Exception {
this.workerConfig = workerConfig;
this.initKubernetesClient();
this.authenticationEnabled = this.workerConfig.isAuthenticationEnabled();
- if (this.workerConfig.isAuthenticationEnabled() && !StringUtils.isEmpty(this.workerConfig.getFunctionAuthProviderClassName())) {
- Optional functionAuthProvider = Optional.empty();
- functionAuthProvider = Optional.of(FunctionAuthProvider.getAuthProvider(workerConfig.getFunctionAuthProviderClassName()));
- KubernetesFunctionAuthProvider kubernetesFunctionAuthProvider = (KubernetesFunctionAuthProvider) functionAuthProvider.get();
- kubernetesFunctionAuthProvider.initialize(coreV1Api, null, null);
- this.authProvider = Optional.of(kubernetesFunctionAuthProvider);
- }
this.functions = new FunctionsImpl(() -> MeshWorkerService.this);
this.sources = new SourcesImpl(() -> MeshWorkerService.this);
this.sinks = new SinksImpl(() -> MeshWorkerService.this);
diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java
index 4bdc354ed..cc4d1adf4 100644
--- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java
+++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java
@@ -20,6 +20,8 @@
import com.google.common.collect.Maps;
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPod;
+import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPodVolumeMounts;
+import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPodVolumes;
import io.functionmesh.compute.util.FunctionsUtil;
import io.functionmesh.compute.functions.models.V1alpha1Function;
import io.functionmesh.compute.MeshWorkerService;
@@ -36,10 +38,10 @@
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.worker.service.api.Functions;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
-
import javax.ws.rs.core.Response;
import java.io.InputStream;
import java.net.URI;
+import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@@ -102,6 +104,8 @@ public void registerFunction(final String tenant,
functionPkgUrl,
functionConfig
);
+ // override namespace by configuration file
+ v1alpha1Function.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
Map customLabels = Maps.newHashMap();
customLabels.put(TENANT_LABEL_CLAIM, tenant);
customLabels.put(NAMESPACE_LABEL_CLAIM, namespace);
@@ -112,35 +116,7 @@ public void registerFunction(final String tenant,
pod.setLabels(customLabels);
v1alpha1Function.getSpec().setPod(pod);
try {
- if (worker().getWorkerConfig().isAuthenticationEnabled()) {
- Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
- functionDetailsBuilder.setTenant(tenant);
- functionDetailsBuilder.setNamespace(namespace);
- functionDetailsBuilder.setName(functionName);
- worker().getAuthProvider().ifPresent(functionAuthProvider -> {
- if (clientAuthenticationDataHttps != null) {
- try {
- String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
- v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
- worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
- v1alpha1Function.getSpec().getPulsar().setAuthSecret(authSecretName);
- String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
- v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
- worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
- v1alpha1Function.getSpec().getPulsar().setTlsSecret(tlsSecretName);
- } catch (Exception e) {
- log.error("Error caching authentication data for {} {}/{}/{}",
- ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
-
-
- throw new RestException(
- Response.Status.INTERNAL_SERVER_ERROR,
- String.format("Error caching authentication data for %s %s:- %s",
- ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
- }
- }
- });
- }
+ this.upsertFunction(tenant, namespace, functionName, functionConfig, v1alpha1Function, clientAuthenticationDataHttps);
Call call = worker().getCustomObjectsApi().createNamespacedCustomObjectCall(
group,
version,
@@ -191,6 +167,7 @@ public void updateFunction(final String tenant,
functionConfig
);
v1alpha1Function.getMetadata().setResourceVersion(oldFn.getMetadata().getResourceVersion());
+ this.upsertFunction(tenant, namespace, functionName, functionConfig, v1alpha1Function, clientAuthenticationDataHttps);
Call replaceCall = worker().getCustomObjectsApi().replaceNamespacedCustomObjectCall(
group,
version,
@@ -304,7 +281,51 @@ public void updateFunctionOnWorkerLeader(final String tenant,
final InputStream uploadedInputStream,
final boolean delete,
URI uri,
- final String clientRole) {
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
+
+ }
+ private void upsertFunction(final String tenant,
+ final String namespace,
+ final String functionName,
+ final FunctionConfig functionConfig,
+ V1alpha1Function v1alpha1Function,
+ AuthenticationDataHttps clientAuthenticationDataHttps) {
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+ if (clientAuthenticationDataHttps != null) {
+ try {
+
+ Map functionsWorkerServiceCustomConfigs = worker()
+ .getWorkerConfig().getFunctionsWorkerServiceCustomConfigs();
+ Object volumes = functionsWorkerServiceCustomConfigs.get("volumes");
+ if (volumes != null) {
+ List volumesList = (List) volumes;
+ v1alpha1Function.getSpec().getPod().setVolumes(volumesList);
+ }
+ Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts");
+ if (volumeMounts != null) {
+ List volumeMountsList = (List) volumeMounts;
+ v1alpha1Function.getSpec().setVolumeMounts(volumeMountsList);
+ }
+ String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
+ v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
+ worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
+ v1alpha1Function.getSpec().getPulsar().setAuthSecret(authSecretName);
+ String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
+ v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
+ worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
+ v1alpha1Function.getSpec().getPulsar().setTlsSecret(tlsSecretName);
+ } catch (Exception e) {
+ log.error("Error create or update auth or tls secret for {} {}/{}/{}",
+ ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+
+
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
+ String.format("Error create or update auth or tls secret for %s %s:- %s",
+ ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+ }
+ }
+ }
}
}
diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/MeshComponentImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/MeshComponentImpl.java
index aab62acf2..475798bf4 100644
--- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/MeshComponentImpl.java
+++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/MeshComponentImpl.java
@@ -348,7 +348,8 @@ public void putFunctionState(final String tenant,
@Override
public void uploadFunction(final InputStream uploadedInputStream,
final String path,
- String clientRole) {
+ String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
}
@@ -376,7 +377,7 @@ public List getListOfConnectors() {
}
@Override
- public void reloadConnectors(String clientRole) {
+ public void reloadConnectors(String clientRole, final AuthenticationDataSource clientAuthenticationDataHttps) {
meshWorkerServiceSupplier.get().getConnectorsManager().reloadConnectors();
}
diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java
index 63ec77c37..672349900 100644
--- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java
+++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java
@@ -21,6 +21,8 @@
import com.google.common.collect.Maps;
import io.functionmesh.compute.sinks.models.V1alpha1Sink;
import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPod;
+import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPodVolumeMounts;
+import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPodVolumes;
import io.functionmesh.compute.util.KubernetesUtils;
import io.functionmesh.compute.util.SinksUtil;
import io.functionmesh.compute.MeshWorkerService;
@@ -40,7 +42,6 @@
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.worker.service.api.Sinks;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
-
import javax.ws.rs.core.Response;
import java.io.InputStream;
import java.net.URI;
@@ -101,8 +102,7 @@ public void registerSink(
clientAuthenticationDataHttps,
ComponentTypeUtils.toString(componentType));
this.validateTenantIsExist(tenant, namespace, sinkName, clientRole);
- V1alpha1Sink v1alpha1Sink;
- v1alpha1Sink =
+ V1alpha1Sink v1alpha1Sink =
SinksUtil.createV1alpha1SkinFromSinkConfig(
kind,
group,
@@ -112,6 +112,7 @@ public void registerSink(
uploadedInputStream,
sinkConfig,
this.meshWorkerServiceSupplier.get().getConnectorsManager());
+ // override namesapce by configuration
v1alpha1Sink.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
try {
Map customLabels = Maps.newHashMap();
@@ -125,33 +126,7 @@ public void registerSink(
}
pod.setLabels(customLabels);
v1alpha1Sink.getSpec().setPod(pod);
- if (worker().getWorkerConfig().isAuthenticationEnabled()) {
- Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
- functionDetailsBuilder.setTenant(tenant);
- functionDetailsBuilder.setNamespace(namespace);
- functionDetailsBuilder.setName(sinkName);
- worker().getAuthProvider().ifPresent(functionAuthProvider -> {
- if (clientAuthenticationDataHttps != null) {
- try {
- String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
- v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
- worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
- v1alpha1Sink.getSpec().getPulsar().setAuthSecret(authSecretName);
- String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
- v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
- worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
- v1alpha1Sink.getSpec().getPulsar().setTlsSecret(tlsSecretName);
- } catch (Exception e) {
- log.error("Error caching authentication data for {} {}/{}/{}",
- ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
-
-
- throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s",
- ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
- }
- }
- });
- }
+ this.upsertSink(tenant, namespace, sinkName, sinkConfig, v1alpha1Sink, clientAuthenticationDataHttps);
Call call =
worker().getCustomObjectsApi()
.createNamespacedCustomObjectCall(
@@ -211,6 +186,7 @@ public void updateSink(
sinkPkgUrl,
uploadedInputStream,
sinkConfig, this.meshWorkerServiceSupplier.get().getConnectorsManager());
+ this.upsertSink(tenant, namespace, sinkName, sinkConfig, v1alpha1Sink, clientAuthenticationDataHttps);
v1alpha1Sink.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
v1alpha1Sink
.getMetadata()
@@ -355,4 +331,46 @@ public List getSinkList() {
public List getSinkConfigDefinition(String name) {
return new ArrayList<>();
}
+
+ private void upsertSink(final String tenant,
+ final String namespace,
+ final String sinkName,
+ final SinkConfig sinkConfig,
+ V1alpha1Sink v1alpha1Sink,
+ AuthenticationDataHttps clientAuthenticationDataHttps) {
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+ if (clientAuthenticationDataHttps != null) {
+ try {
+ Map functionsWorkerServiceCustomConfigs = worker()
+ .getWorkerConfig().getFunctionsWorkerServiceCustomConfigs();
+ Object volumes = functionsWorkerServiceCustomConfigs.get("volumes");
+ if (volumes != null) {
+ List volumesList = (List) volumes;
+ v1alpha1Sink.getSpec().getPod().setVolumes(volumesList);
+ }
+ Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts");
+ if (volumeMounts != null) {
+ List volumeMountsList = (List) volumeMounts;
+ v1alpha1Sink.getSpec().setVolumeMounts(volumeMountsList);
+ }
+ String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
+ v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
+ worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
+ v1alpha1Sink.getSpec().getPulsar().setAuthSecret(authSecretName);
+ String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
+ v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
+ worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
+ v1alpha1Sink.getSpec().getPulsar().setTlsSecret(tlsSecretName);
+ } catch (Exception e) {
+ log.error("Error create or update auth or tls secret data for {} {}/{}/{}",
+ ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+
+
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
+ String.format("Error create or update auth or tls secret for %s %s:- %s",
+ ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+ }
+ }
+ }
+ }
}
diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java
index 91f0f00b3..edeefe7ba 100644
--- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java
+++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java
@@ -20,6 +20,8 @@
import com.google.common.collect.Maps;
import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPod;
+import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPodVolumeMounts;
+import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPodVolumes;
import io.functionmesh.compute.util.KubernetesUtils;
import io.functionmesh.compute.util.SourcesUtil;
import io.functionmesh.compute.MeshWorkerService;
@@ -40,14 +42,12 @@
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.worker.service.api.Sources;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
-
import javax.ws.rs.core.Response;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.function.Supplier;
@Slf4j
@@ -112,9 +112,18 @@ public void registerSource(final String tenant,
ComponentTypeUtils.toString(componentType));
this.validateTenantIsExist(tenant, namespace, sourceName, clientRole);
try {
- V1alpha1Source v1alpha1Source = SourcesUtil.createV1alpha1SourceFromSourceConfig(kind, group, version,
- sourceName, sourcePkgUrl, uploadedInputStream, sourceConfig,
- this.meshWorkerServiceSupplier.get().getConnectorsManager());
+ V1alpha1Source v1alpha1Source = SourcesUtil
+ .createV1alpha1SourceFromSourceConfig(
+ kind,
+ group,
+ version,
+ sourceName,
+ sourcePkgUrl,
+ uploadedInputStream,
+ sourceConfig,
+ this.meshWorkerServiceSupplier.get().getConnectorsManager());
+ // override namesapce by configuration
+ v1alpha1Source.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
Map customLabels = Maps.newHashMap();
customLabels.put(CLUSTER_LABEL_CLAIM, v1alpha1Source.getSpec().getClusterName());
customLabels.put(TENANT_LABEL_CLAIM, tenant);
@@ -126,34 +135,7 @@ public void registerSource(final String tenant,
}
pod.setLabels(customLabels);
v1alpha1Source.getSpec().setPod(pod);
- if (worker().getWorkerConfig().isAuthenticationEnabled()) {
- Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
- functionDetailsBuilder.setTenant(tenant);
- functionDetailsBuilder.setNamespace(namespace);
- functionDetailsBuilder.setName(sourceName);
- Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
- worker().getAuthProvider().ifPresent(functionAuthProvider -> {
- if (clientAuthenticationDataHttps != null) {
- try {
- String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
- v1alpha1Source.getSpec().getClusterName(), tenant, namespace, sourceName,
- worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
- v1alpha1Source.getSpec().getPulsar().setAuthSecret(authSecretName);
- String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
- v1alpha1Source.getSpec().getClusterName(), tenant, namespace, sourceName,
- worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
- v1alpha1Source.getSpec().getPulsar().setTlsSecret(tlsSecretName);
- } catch (Exception e) {
- log.error("Error caching authentication data for {} {}/{}/{}",
- ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
-
-
- throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s",
- ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
- }
- }
- });
- }
+ this.upsertSource(tenant, namespace, sourceName, sourceConfig, v1alpha1Source, clientAuthenticationDataHttps);
Call call = worker().getCustomObjectsApi().createNamespacedCustomObjectCall(
group, version, KubernetesUtils.getNamespace(worker().getFactoryConfig()),
plural,
@@ -186,27 +168,6 @@ public void updateSource(final String tenant,
clientAuthenticationDataHttps,
ComponentTypeUtils.toString(componentType));
try {
- if (worker().getWorkerConfig().isAuthenticationEnabled()) {
- Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
- functionDetailsBuilder.setTenant(tenant);
- functionDetailsBuilder.setNamespace(namespace);
- functionDetailsBuilder.setName(sourceName);
- Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
- worker().getAuthProvider().ifPresent(functionAuthProvider -> {
- if (clientAuthenticationDataHttps != null) {
- try {
- functionAuthProvider.updateAuthData(functionDetails, Optional.empty(), clientAuthenticationDataHttps);
- } catch (Exception e) {
- log.error("Error caching authentication data for {} {}/{}/{}",
- ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
-
-
- throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s",
- ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
- }
- }
- });
- }
Call getCall = worker().getCustomObjectsApi().getNamespacedCustomObjectCall(
group,
version,
@@ -228,6 +189,7 @@ public void updateSource(final String tenant,
this.meshWorkerServiceSupplier.get().getConnectorsManager()
);
v1alpha1Source.getMetadata().setResourceVersion(oldRes.getMetadata().getResourceVersion());
+ this.upsertSource(tenant, namespace, sourceName, sourceConfig, v1alpha1Source, clientAuthenticationDataHttps);
Call replaceCall = worker().getCustomObjectsApi().replaceNamespacedCustomObjectCall(
group,
version,
@@ -339,4 +301,46 @@ public List getSourceList() {
public List getSourceConfigDefinition(String name) {
return new ArrayList<>();
}
+
+ private void upsertSource(final String tenant,
+ final String namespace,
+ final String sourceName,
+ SourceConfig sourceConfig,
+ V1alpha1Source v1alpha1Source,
+ AuthenticationDataHttps clientAuthenticationDataHttps) {
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+ if (clientAuthenticationDataHttps != null) {
+ try {
+ Map functionsWorkerServiceCustomConfigs = worker()
+ .getWorkerConfig().getFunctionsWorkerServiceCustomConfigs();
+ Object volumes = functionsWorkerServiceCustomConfigs.get("volumes");
+ if (volumes != null) {
+ List volumesList = (List) volumes;
+ v1alpha1Source.getSpec().getPod().setVolumes(volumesList);
+ }
+ Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts");
+ if (volumeMounts != null) {
+ List volumeMountsList = (List) volumeMounts;
+ v1alpha1Source.getSpec().setVolumeMounts(volumeMountsList);
+ }
+ String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
+ v1alpha1Source.getSpec().getClusterName(), tenant, namespace, sourceName,
+ worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
+ v1alpha1Source.getSpec().getPulsar().setAuthSecret(authSecretName);
+ String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
+ v1alpha1Source.getSpec().getClusterName(), tenant, namespace, sourceName,
+ worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
+ v1alpha1Source.getSpec().getPulsar().setTlsSecret(tlsSecretName);
+ } catch (Exception e) {
+ log.error("Error create or update auth or tls secret for {} {}/{}/{}",
+ ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+
+
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
+ String.format("Error create or update auth or tls secret %s %s:- %s",
+ ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+ }
+ }
+ }
+ }
}
diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java
index 36d030b94..43afb91d5 100644
--- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java
+++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java
@@ -20,7 +20,6 @@
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import org.apache.pulsar.common.functions.FunctionConfig;
-
import java.util.Map;
import java.util.stream.Collectors;
@@ -80,4 +79,5 @@ public static Map transformedMapValueToObject(Map