From 2467e2194ca42c55523c66da5977a30175248631 Mon Sep 17 00:00:00 2001 From: guangning Date: Mon, 17 May 2021 18:19:18 +0800 Subject: [PATCH 1/8] Fixed namespace Support mount secret file --- mesh-worker-service/pom.xml | 4 +-- .../compute/rest/api/FunctionsImpl.java | 5 ++- .../compute/rest/api/MeshComponentImpl.java | 5 +-- .../compute/rest/api/SinksImpl.java | 18 ++++++++-- .../compute/rest/api/SourcesImpl.java | 33 ++++++++++++++++--- .../functionmesh/compute/util/CommonUtil.java | 2 +- .../src/main/resources/functions_worker.yml | 16 ++++++++- 7 files changed, 70 insertions(+), 13 deletions(-) diff --git a/mesh-worker-service/pom.xml b/mesh-worker-service/pom.xml index d2d345703..9b54016f1 100644 --- a/mesh-worker-service/pom.xml +++ b/mesh-worker-service/pom.xml @@ -29,7 +29,7 @@ v0.1.5-SNAPSHOT - 2.8.0-rc-202103292206 + 2.8.0-rc-202105140121 1.18.16 2.14.0 10.0.1 @@ -42,7 +42,7 @@ - org.apache.pulsar + io.streamnative pulsar-functions-worker ${pulsar.version} diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java index 8d6d21343..753a9b9b5 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java @@ -102,6 +102,8 @@ public void registerFunction(final String tenant, functionPkgUrl, functionConfig ); + // override namespace by configuration file + v1alpha1Function.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig())); Map customLabels = Maps.newHashMap(); customLabels.put(TENANT_LABEL_CLAIM, tenant); customLabels.put(NAMESPACE_LABEL_CLAIM, namespace); @@ -299,7 +301,8 @@ public void updateFunctionOnWorkerLeader(final String tenant, final InputStream uploadedInputStream, final boolean delete, URI uri, - final String clientRole) { + final String clientRole, + final AuthenticationDataSource clientAuthenticationDataHttps) { } } diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/MeshComponentImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/MeshComponentImpl.java index 3a8743760..0c200ad9b 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/MeshComponentImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/MeshComponentImpl.java @@ -317,7 +317,8 @@ public void putFunctionState(final String tenant, @Override public void uploadFunction(final InputStream uploadedInputStream, final String path, - String clientRole) { + String clientRole, + final AuthenticationDataSource clientAuthenticationDataHttps) { } @@ -345,7 +346,7 @@ public List getListOfConnectors() { } @Override - public void reloadConnectors(String clientRole) { + public void reloadConnectors(String clientRole, final AuthenticationDataSource clientAuthenticationDataHttps) { meshWorkerServiceSupplier.get().getConnectorsManager().reloadConnectors(); } diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java index cd5e02853..40be9ecb9 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java @@ -21,6 +21,8 @@ import com.google.common.collect.Maps; import io.functionmesh.compute.sinks.models.V1alpha1Sink; import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPod; +import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPodVolumeMounts; +import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPodVolumes; import io.functionmesh.compute.util.KubernetesUtils; import io.functionmesh.compute.util.SinksUtil; import io.functionmesh.compute.MeshWorkerService; @@ -105,8 +107,7 @@ public void registerSink( clientAuthenticationDataHttps, ComponentTypeUtils.toString(componentType)); this.validateTenantIsExist(tenant, namespace, sinkName, clientRole); - V1alpha1Sink v1alpha1Sink; - v1alpha1Sink = + V1alpha1Sink v1alpha1Sink = SinksUtil.createV1alpha1SkinFromSinkConfig( kind, group, @@ -116,6 +117,7 @@ public void registerSink( uploadedInputStream, sinkConfig, this.meshWorkerServiceSupplier.get().getConnectorsManager()); + // override namesapce by configuration v1alpha1Sink.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig())); try { Map customLabels = Maps.newHashMap(); @@ -138,6 +140,18 @@ public void registerSink( String type = "auth"; KubernetesUtils.createConfigMap(type, tenant, namespace, sinkName, worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig()); + Map functionsWorkerServiceCustomConfigs = worker() + .getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(); + Object volumes = functionsWorkerServiceCustomConfigs.get("volumes"); + if (volumes != null) { + List volumesList = (List) volumes; + v1alpha1Sink.getSpec().getPod().setVolumes(volumesList); + } + Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts"); + if (volumeMounts != null) { + List volumeMountsList = (List) volumeMounts; + v1alpha1Sink.getSpec().setVolumeMounts(volumeMountsList); + } v1alpha1Sink.getSpec().getPulsar().setAuthConfig(KubernetesUtils.getConfigMapName( type, sinkConfig.getTenant(), sinkConfig.getNamespace(), sinkName)); } catch (Exception e) { diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java index c5df0e07a..edd658ea5 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java @@ -19,7 +19,10 @@ package io.functionmesh.compute.rest.api; import com.google.common.collect.Maps; +import com.google.gson.Gson; import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPod; +import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPodVolumeMounts; +import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPodVolumes; import io.functionmesh.compute.util.KubernetesUtils; import io.functionmesh.compute.util.SourcesUtil; import io.functionmesh.compute.MeshWorkerService; @@ -112,9 +115,18 @@ public void registerSource(final String tenant, ComponentTypeUtils.toString(componentType)); this.validateTenantIsExist(tenant, namespace, sourceName, clientRole); try { - V1alpha1Source v1alpha1Source = SourcesUtil.createV1alpha1SourceFromSourceConfig(kind, group, version, - sourceName, sourcePkgUrl, uploadedInputStream, sourceConfig, - this.meshWorkerServiceSupplier.get().getConnectorsManager()); + V1alpha1Source v1alpha1Source = SourcesUtil + .createV1alpha1SourceFromSourceConfig( + kind, + group, + version, + sourceName, + sourcePkgUrl, + uploadedInputStream, + sourceConfig, + this.meshWorkerServiceSupplier.get().getConnectorsManager()); + // override namesapce by configuration + v1alpha1Source.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig())); Map customLabels = Maps.newHashMap(); customLabels.put(TENANT_LABEL_CLAIM, tenant); customLabels.put(NAMESPACE_LABEL_CLAIM, namespace); @@ -135,7 +147,20 @@ public void registerSource(final String tenant, try { String type = "auth"; KubernetesUtils.createConfigMap(type, tenant, namespace, sourceName, - worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig()); + worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig()); + Map functionsWorkerServiceCustomConfigs = worker() + .getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(); + Object volumes = functionsWorkerServiceCustomConfigs.get("volumes"); + if (volumes != null) { + List volumesList = (List) volumes; + v1alpha1Source.getSpec().getPod().setVolumes(volumesList); + } + Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts"); + if (volumeMounts != null) { + List volumeMountsList = (List) volumeMounts; + v1alpha1Source.getSpec().setVolumeMounts(volumeMountsList); + } + v1alpha1Source.getSpec().getPod().getVolumes(); v1alpha1Source.getSpec().getPulsar().setAuthConfig(KubernetesUtils.getConfigMapName( type, sourceConfig.getTenant(), sourceConfig.getNamespace(), sourceName)); } catch (Exception e) { diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java index 6125d6d4e..36d030b94 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java @@ -36,7 +36,7 @@ public static String getDefaultPulsarConfig() { } public static String getPulsarClusterConfigMapName(String cluster) { - return toValidResourceName(String.format("%s-pulsar-config-map", cluster)); // Need to manage the configMap for each Pulsar Cluster + return toValidResourceName(String.format("%s-function-mesh-config", cluster)); // Need to manage the configMap for each Pulsar Cluster } public static String getPulsarClusterAuthConfigMapName(String cluster) { diff --git a/mesh-worker-service/src/main/resources/functions_worker.yml b/mesh-worker-service/src/main/resources/functions_worker.yml index 717236c28..a73b5df5c 100644 --- a/mesh-worker-service/src/main/resources/functions_worker.yml +++ b/mesh-worker-service/src/main/resources/functions_worker.yml @@ -241,4 +241,18 @@ connectorsDirectory: ./connectors functionsDirectory: ./functions # Should connector config be validated during during submission -validateConnectorConfig: false \ No newline at end of file +validateConnectorConfig: false + +functionsWorkerServiceCustomConfigs: + volumeMounts: + - mountPath: /mnt/secrets + name: secret-pulsarcluster-data + readOnly: true + volumes: + volumes: + - name: secret-pulsarcluster-data + secret: + defaultMode: 420 + secretName: pulsarcluster-data + authConfigMap: + name: function-mesh-configmap-auth-public-default-testd From dfbc33b320367891d7ff22e6a9dc833e7b1dcc22 Mon Sep 17 00:00:00 2001 From: guangning Date: Mon, 17 May 2021 22:50:02 +0800 Subject: [PATCH 2/8] Fixed auth config map --- .../compute/MeshWorkerService.java | 12 -- .../compute/rest/api/FunctionsImpl.java | 78 ++++++++---- .../compute/rest/api/SinksImpl.java | 92 +++++++------- .../compute/rest/api/SourcesImpl.java | 112 ++++++++---------- .../functionmesh/compute/util/CommonUtil.java | 2 +- .../compute/util/KubernetesUtils.java | 41 ++++--- .../src/main/resources/functions_worker.yml | 3 +- 7 files changed, 182 insertions(+), 158 deletions(-) diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/MeshWorkerService.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/MeshWorkerService.java index 3bb3bd76f..20584fea9 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/MeshWorkerService.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/MeshWorkerService.java @@ -28,7 +28,6 @@ import io.kubernetes.client.util.Config; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; @@ -38,11 +37,8 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.util.SimpleTextOutputStream; -import org.apache.pulsar.functions.auth.FunctionAuthProvider; -import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider; import org.apache.pulsar.functions.runtime.RuntimeUtils; import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig; -import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.ErrorNotifier; import org.apache.pulsar.functions.worker.PulsarWorkerService; import org.apache.pulsar.functions.worker.WorkerConfig; @@ -76,7 +72,6 @@ public class MeshWorkerService implements WorkerService { private CustomObjectsApi customObjectsApi; private ApiClient apiClient; private PulsarAdmin brokerAdmin; - private Optional authProvider; private KubernetesRuntimeFactoryConfig factoryConfig; private AuthenticationService authenticationService; @@ -138,13 +133,6 @@ public void init(WorkerConfig workerConfig) throws Exception { this.workerConfig = workerConfig; this.initKubernetesClient(); this.authenticationEnabled = this.workerConfig.isAuthenticationEnabled(); - if (this.workerConfig.isAuthenticationEnabled() && !StringUtils.isEmpty(this.workerConfig.getFunctionAuthProviderClassName())) { - Optional functionAuthProvider = Optional.empty(); - functionAuthProvider = Optional.of(FunctionAuthProvider.getAuthProvider(workerConfig.getFunctionAuthProviderClassName())); - KubernetesFunctionAuthProvider kubernetesFunctionAuthProvider = (KubernetesFunctionAuthProvider) functionAuthProvider.get(); - kubernetesFunctionAuthProvider.initialize(coreV1Api, null, null); - this.authProvider = Optional.of(kubernetesFunctionAuthProvider); - } this.functions = new FunctionsImpl(() -> MeshWorkerService.this); this.sources = new SourcesImpl(() -> MeshWorkerService.this); this.sinks = new SinksImpl(() -> MeshWorkerService.this); diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java index 753a9b9b5..95f962884 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java @@ -20,6 +20,8 @@ import com.google.common.collect.Maps; import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPod; +import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPodVolumeMounts; +import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPodVolumes; import io.functionmesh.compute.util.FunctionsUtil; import io.functionmesh.compute.functions.models.V1alpha1Function; import io.functionmesh.compute.MeshWorkerService; @@ -36,10 +38,10 @@ import org.apache.pulsar.functions.utils.ComponentTypeUtils; import org.apache.pulsar.functions.worker.service.api.Functions; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; - import javax.ws.rs.core.Response; import java.io.InputStream; import java.net.URI; +import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -114,30 +116,7 @@ public void registerFunction(final String tenant, pod.setLabels(customLabels); v1alpha1Function.getSpec().setPod(pod); try { - if (worker().getWorkerConfig().isAuthenticationEnabled()) { - Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder(); - functionDetailsBuilder.setTenant(tenant); - functionDetailsBuilder.setNamespace(namespace); - functionDetailsBuilder.setName(functionName); - worker().getAuthProvider().ifPresent(functionAuthProvider -> { - if (clientAuthenticationDataHttps != null) { - try { - String type = "auth"; - KubernetesUtils.createConfigMap(type, tenant, namespace, functionName, - worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig()); - v1alpha1Function.getSpec().getPulsar().setAuthConfig(KubernetesUtils.getConfigMapName( - type, functionConfig.getTenant(), functionConfig.getNamespace(), functionName)); - } catch (Exception e) { - log.error("Error caching authentication data for {} {}/{}/{}", - ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e); - - - throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", - ComponentTypeUtils.toString(componentType), functionName, e.getMessage())); - } - } - }); - } + this.upsertFunction(tenant, namespace, functionName, functionConfig, v1alpha1Function, clientAuthenticationDataHttps); Call call = worker().getCustomObjectsApi().createNamespacedCustomObjectCall( group, version, @@ -188,6 +167,7 @@ public void updateFunction(final String tenant, functionConfig ); v1alpha1Function.getMetadata().setResourceVersion(oldFn.getMetadata().getResourceVersion()); + this.upsertFunction(tenant, namespace, functionName, functionConfig, v1alpha1Function, clientAuthenticationDataHttps); Call replaceCall = worker().getCustomObjectsApi().replaceNamespacedCustomObjectCall( group, version, @@ -305,4 +285,52 @@ public void updateFunctionOnWorkerLeader(final String tenant, final AuthenticationDataSource clientAuthenticationDataHttps) { } + + private void upsertFunction(final String tenant, + final String namespace, + final String functionName, + final FunctionConfig functionConfig, + V1alpha1Function v1alpha1Function, + AuthenticationDataHttps clientAuthenticationDataHttps) { + if (worker().getWorkerConfig().isAuthenticationEnabled()) { + if (clientAuthenticationDataHttps != null) { + try { + Map functionsWorkerServiceCustomConfigs = worker() + .getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(); + String type = "auth"; + Object authConfigMapName = functionsWorkerServiceCustomConfigs.get("authConfigMap"); + String configMapName = KubernetesUtils.getConfigMapName( + type, functionConfig.getTenant(), functionConfig.getNamespace(), functionName); + if (authConfigMapName != null) { + configMapName = (String) authConfigMapName; + } + KubernetesUtils.upsertConfigMap( + tenant, namespace, functionName, + worker().getWorkerConfig(), + worker().getCoreV1Api(), + worker().getFactoryConfig(), + configMapName); + Object volumes = functionsWorkerServiceCustomConfigs.get("volumes"); + if (volumes != null) { + List volumesList = (List) volumes; + v1alpha1Function.getSpec().getPod().setVolumes(volumesList); + } + Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts"); + if (volumeMounts != null) { + List volumeMountsList = (List) volumeMounts; + v1alpha1Function.getSpec().setVolumeMounts(volumeMountsList); + } + v1alpha1Function.getSpec().getPulsar().setAuthConfig(KubernetesUtils.getConfigMapName( + type, functionConfig.getTenant(), functionConfig.getNamespace(), functionName)); + } catch (Exception e) { + log.error("Error caching authentication data for {} {}/{}/{}", + ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e); + + + throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", + ComponentTypeUtils.toString(componentType), functionName, e.getMessage())); + } + } + } + } } diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java index 40be9ecb9..6156dcec6 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java @@ -26,9 +26,7 @@ import io.functionmesh.compute.util.KubernetesUtils; import io.functionmesh.compute.util.SinksUtil; import io.functionmesh.compute.MeshWorkerService; -import io.functionmesh.compute.sinks.models.V1alpha1Sink; import io.functionmesh.compute.sinks.models.V1alpha1SinkStatus; -import io.functionmesh.compute.util.SinksUtil; import lombok.extern.slf4j.Slf4j; import okhttp3.Call; import org.apache.commons.lang3.StringUtils; @@ -44,12 +42,9 @@ import org.apache.pulsar.functions.utils.ComponentTypeUtils; import org.apache.pulsar.functions.worker.service.api.Sinks; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; - import javax.ws.rs.core.Response; -import java.io.IOException; import java.io.InputStream; import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -129,42 +124,7 @@ public void registerSink( } pod.setLabels(customLabels); v1alpha1Sink.getSpec().setPod(pod); - if (worker().getWorkerConfig().isAuthenticationEnabled()) { - Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder(); - functionDetailsBuilder.setTenant(tenant); - functionDetailsBuilder.setNamespace(namespace); - functionDetailsBuilder.setName(sinkName); - worker().getAuthProvider().ifPresent(functionAuthProvider -> { - if (clientAuthenticationDataHttps != null) { - try { - String type = "auth"; - KubernetesUtils.createConfigMap(type, tenant, namespace, sinkName, - worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig()); - Map functionsWorkerServiceCustomConfigs = worker() - .getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(); - Object volumes = functionsWorkerServiceCustomConfigs.get("volumes"); - if (volumes != null) { - List volumesList = (List) volumes; - v1alpha1Sink.getSpec().getPod().setVolumes(volumesList); - } - Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts"); - if (volumeMounts != null) { - List volumeMountsList = (List) volumeMounts; - v1alpha1Sink.getSpec().setVolumeMounts(volumeMountsList); - } - v1alpha1Sink.getSpec().getPulsar().setAuthConfig(KubernetesUtils.getConfigMapName( - type, sinkConfig.getTenant(), sinkConfig.getNamespace(), sinkName)); - } catch (Exception e) { - log.error("Error caching authentication data for {} {}/{}/{}", - ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e); - - - throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", - ComponentTypeUtils.toString(componentType), sinkName, e.getMessage())); - } - } - }); - } + this.upsertSink(tenant, namespace, sinkName, sinkConfig, v1alpha1Sink, clientAuthenticationDataHttps); Call call = worker().getCustomObjectsApi() .createNamespacedCustomObjectCall( @@ -223,6 +183,7 @@ public void updateSink( sinkPkgUrl, uploadedInputStream, sinkConfig, this.meshWorkerServiceSupplier.get().getConnectorsManager()); + this.upsertSink(tenant, namespace, sinkName, sinkConfig, v1alpha1Sink, clientAuthenticationDataHttps); v1alpha1Sink.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig())); v1alpha1Sink .getMetadata() @@ -367,4 +328,53 @@ public List getSinkList() { public List getSinkConfigDefinition(String name) { return new ArrayList<>(); } + + private void upsertSink(final String tenant, + final String namespace, + final String sinkName, + final SinkConfig sinkConfig, + V1alpha1Sink v1alpha1Sink, + AuthenticationDataHttps clientAuthenticationDataHttps) { + if (worker().getWorkerConfig().isAuthenticationEnabled()) { + if (clientAuthenticationDataHttps != null) { + try { + Map functionsWorkerServiceCustomConfigs = worker() + .getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(); + Object authConfigMapName = functionsWorkerServiceCustomConfigs.get("authConfigMap"); + String type = "auth"; + String configMapName = KubernetesUtils.getConfigMapName( + type, sinkConfig.getTenant(), sinkConfig.getNamespace(), sinkName); + if (authConfigMapName != null) { + configMapName = (String) authConfigMapName; + } + KubernetesUtils.upsertConfigMap( + tenant, + namespace, + sinkName, + worker().getWorkerConfig(), worker().getCoreV1Api(), + worker().getFactoryConfig(), + configMapName); + Object volumes = functionsWorkerServiceCustomConfigs.get("volumes"); + if (volumes != null) { + List volumesList = (List) volumes; + v1alpha1Sink.getSpec().getPod().setVolumes(volumesList); + } + Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts"); + if (volumeMounts != null) { + List volumeMountsList = (List) volumeMounts; + v1alpha1Sink.getSpec().setVolumeMounts(volumeMountsList); + } + + v1alpha1Sink.getSpec().getPulsar().setAuthConfig(configMapName); + } catch (Exception e) { + log.error("Error caching authentication data for {} {}/{}/{}", + ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e); + + + throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", + ComponentTypeUtils.toString(componentType), sinkName, e.getMessage())); + } + } + } + } } diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java index edd658ea5..06ecb67a6 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java @@ -19,7 +19,6 @@ package io.functionmesh.compute.rest.api; import com.google.common.collect.Maps; -import com.google.gson.Gson; import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPod; import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPodVolumeMounts; import io.functionmesh.compute.sources.models.V1alpha1SourceSpecPodVolumes; @@ -43,14 +42,12 @@ import org.apache.pulsar.functions.utils.ComponentTypeUtils; import org.apache.pulsar.functions.worker.service.api.Sources; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; - import javax.ws.rs.core.Response; import java.io.InputStream; import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Supplier; @Slf4j @@ -136,44 +133,7 @@ public void registerSource(final String tenant, } pod.setLabels(customLabels); v1alpha1Source.getSpec().setPod(pod); - if (worker().getWorkerConfig().isAuthenticationEnabled()) { - Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder(); - functionDetailsBuilder.setTenant(tenant); - functionDetailsBuilder.setNamespace(namespace); - functionDetailsBuilder.setName(sourceName); - Function.FunctionDetails functionDetails = functionDetailsBuilder.build(); - worker().getAuthProvider().ifPresent(functionAuthProvider -> { - if (clientAuthenticationDataHttps != null) { - try { - String type = "auth"; - KubernetesUtils.createConfigMap(type, tenant, namespace, sourceName, - worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig()); - Map functionsWorkerServiceCustomConfigs = worker() - .getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(); - Object volumes = functionsWorkerServiceCustomConfigs.get("volumes"); - if (volumes != null) { - List volumesList = (List) volumes; - v1alpha1Source.getSpec().getPod().setVolumes(volumesList); - } - Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts"); - if (volumeMounts != null) { - List volumeMountsList = (List) volumeMounts; - v1alpha1Source.getSpec().setVolumeMounts(volumeMountsList); - } - v1alpha1Source.getSpec().getPod().getVolumes(); - v1alpha1Source.getSpec().getPulsar().setAuthConfig(KubernetesUtils.getConfigMapName( - type, sourceConfig.getTenant(), sourceConfig.getNamespace(), sourceName)); - } catch (Exception e) { - log.error("Error caching authentication data for {} {}/{}/{}", - ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e); - - - throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", - ComponentTypeUtils.toString(componentType), sourceName, e.getMessage())); - } - } - }); - } + this.upsertSource(tenant, namespace, sourceName, sourceConfig, v1alpha1Source, clientAuthenticationDataHttps); Call call = worker().getCustomObjectsApi().createNamespacedCustomObjectCall( group, version, KubernetesUtils.getNamespace(worker().getFactoryConfig()), plural, @@ -206,27 +166,6 @@ public void updateSource(final String tenant, clientAuthenticationDataHttps, ComponentTypeUtils.toString(componentType)); try { - if (worker().getWorkerConfig().isAuthenticationEnabled()) { - Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder(); - functionDetailsBuilder.setTenant(tenant); - functionDetailsBuilder.setNamespace(namespace); - functionDetailsBuilder.setName(sourceName); - Function.FunctionDetails functionDetails = functionDetailsBuilder.build(); - worker().getAuthProvider().ifPresent(functionAuthProvider -> { - if (clientAuthenticationDataHttps != null) { - try { - functionAuthProvider.updateAuthData(functionDetails, Optional.empty(), clientAuthenticationDataHttps); - } catch (Exception e) { - log.error("Error caching authentication data for {} {}/{}/{}", - ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e); - - - throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", - ComponentTypeUtils.toString(componentType), sourceName, e.getMessage())); - } - } - }); - } Call getCall = worker().getCustomObjectsApi().getNamespacedCustomObjectCall( group, version, @@ -248,6 +187,7 @@ public void updateSource(final String tenant, this.meshWorkerServiceSupplier.get().getConnectorsManager() ); v1alpha1Source.getMetadata().setResourceVersion(oldRes.getMetadata().getResourceVersion()); + this.upsertSource(tenant, namespace, sourceName, sourceConfig, v1alpha1Source, clientAuthenticationDataHttps); Call replaceCall = worker().getCustomObjectsApi().replaceNamespacedCustomObjectCall( group, version, @@ -359,4 +299,52 @@ public List getSourceList() { public List getSourceConfigDefinition(String name) { return new ArrayList<>(); } + + private void upsertSource(final String tenant, + final String namespace, + final String sourceName, + SourceConfig sourceConfig, + V1alpha1Source v1alpha1Source, + AuthenticationDataHttps clientAuthenticationDataHttps) { + if (worker().getWorkerConfig().isAuthenticationEnabled()) { + if (clientAuthenticationDataHttps != null) { + try { + Map functionsWorkerServiceCustomConfigs = worker() + .getWorkerConfig().getFunctionsWorkerServiceCustomConfigs(); + String type = "auth"; + Object authConfigMapName = functionsWorkerServiceCustomConfigs.get("authConfigMap"); + String configMapName = KubernetesUtils.getConfigMapName( + type, sourceConfig.getTenant(), sourceConfig.getNamespace(), sourceName); + if (authConfigMapName != null) { + configMapName = (String) authConfigMapName; + } + KubernetesUtils.upsertConfigMap( + tenant, namespace, sourceName, + worker().getWorkerConfig(), + worker().getCoreV1Api(), + worker().getFactoryConfig(), + configMapName); + Object volumes = functionsWorkerServiceCustomConfigs.get("volumes"); + if (volumes != null) { + List volumesList = (List) volumes; + v1alpha1Source.getSpec().getPod().setVolumes(volumesList); + } + Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts"); + if (volumeMounts != null) { + List volumeMountsList = (List) volumeMounts; + v1alpha1Source.getSpec().setVolumeMounts(volumeMountsList); + } + v1alpha1Source.getSpec().getPulsar().setAuthConfig(KubernetesUtils.getConfigMapName( + type, sourceConfig.getTenant(), sourceConfig.getNamespace(), sourceName)); + } catch (Exception e) { + log.error("Error creating authentication data for {} {}/{}/{}", + ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e); + + + throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", + ComponentTypeUtils.toString(componentType), sourceName, e.getMessage())); + } + } + } + } } diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java index 36d030b94..43afb91d5 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java @@ -20,7 +20,6 @@ import io.kubernetes.client.openapi.models.V1ObjectMeta; import org.apache.pulsar.common.functions.FunctionConfig; - import java.util.Map; import java.util.stream.Collectors; @@ -80,4 +79,5 @@ public static Map transformedMapValueToObject(Map buildConfigMap(WorkerConfig workerConfig) { return valueMap; } - public static String createConfigMap( - String type, + public static String upsertConfigMap( String tenant, String namespace, String name, WorkerConfig workerConfig, CoreV1Api coreV1Api, - KubernetesRuntimeFactoryConfig factoryConfig) throws ApiException, InterruptedException { - - String configMapName = getConfigMapName(type, tenant, namespace, name); + KubernetesRuntimeFactoryConfig factoryConfig, + String authConfigMapName) throws InterruptedException { StringBuilder sb = new StringBuilder(); Actions.Action createAuthConfigMap = Actions.Action.builder() - .actionName(String.format("Creating authentication config map for function %s/%s/%s", tenant, namespace, name)) + .actionName(String.format( + "Creating authentication config map for function %s/%s/%s", tenant, namespace, name)) .numRetries(NUM_RETRIES) .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS) .supplier(() -> { String id = RandomStringUtils.random(5, true, true).toLowerCase(); V1ConfigMap v1ConfigMap = new V1ConfigMap() - .metadata(new V1ObjectMeta().name(configMapName)) + .metadata(new V1ObjectMeta().name(authConfigMapName)) .data(buildConfigMap(workerConfig)); try { - coreV1Api.createNamespacedConfigMap(KubernetesUtils.getNamespace(factoryConfig), v1ConfigMap, null, null, null); + coreV1Api.createNamespacedConfigMap( + KubernetesUtils.getNamespace(factoryConfig), + v1ConfigMap, null, null, null); } catch (ApiException e) { // already exists if (e.getCode() == HTTP_CONFLICT) { - return Actions.ActionResult.builder() - .errorMsg(String.format("ConfigMap %s already present", id)) - .success(false) - .build(); + try { + coreV1Api.replaceNamespacedConfigMap( + authConfigMapName, + KubernetesUtils.getNamespace(factoryConfig), + v1ConfigMap, null, null, null); + return Actions.ActionResult.builder().success(true).build(); + } catch (ApiException e1) { + String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); + return Actions.ActionResult.builder() + .success(false) + .errorMsg(errorMsg) + .build(); + } } String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); @@ -144,7 +152,10 @@ public static String createConfigMap( .run(); if (!success.get()) { - throw new RuntimeException(String.format("Failed to create authentication configmap for function %s/%s/%s", tenant, namespace, name)); + throw new RuntimeException( + String.format( + "Failed to create authentication configmap for function %s/%s/%s", + tenant, namespace, name)); } return sb.toString(); diff --git a/mesh-worker-service/src/main/resources/functions_worker.yml b/mesh-worker-service/src/main/resources/functions_worker.yml index a73b5df5c..a454a923c 100644 --- a/mesh-worker-service/src/main/resources/functions_worker.yml +++ b/mesh-worker-service/src/main/resources/functions_worker.yml @@ -254,5 +254,4 @@ functionsWorkerServiceCustomConfigs: secret: defaultMode: 420 secretName: pulsarcluster-data - authConfigMap: - name: function-mesh-configmap-auth-public-default-testd + authConfigMap: function-mesh-configmap-auth-public-default-testd From 230a49f7a7977671960faa6a06437d7be2ca2a99 Mon Sep 17 00:00:00 2001 From: guangning Date: Mon, 17 May 2021 23:01:13 +0800 Subject: [PATCH 3/8] Fixed comment --- .../java/io/functionmesh/compute/rest/api/FunctionsImpl.java | 2 +- .../main/java/io/functionmesh/compute/rest/api/SinksImpl.java | 2 +- .../main/java/io/functionmesh/compute/rest/api/SourcesImpl.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java index 95f962884..03d58487c 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java @@ -323,7 +323,7 @@ private void upsertFunction(final String tenant, v1alpha1Function.getSpec().getPulsar().setAuthConfig(KubernetesUtils.getConfigMapName( type, functionConfig.getTenant(), functionConfig.getNamespace(), functionName)); } catch (Exception e) { - log.error("Error caching authentication data for {} {}/{}/{}", + log.error("Error create or update authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e); diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java index 6156dcec6..d931f30ea 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java @@ -367,7 +367,7 @@ private void upsertSink(final String tenant, v1alpha1Sink.getSpec().getPulsar().setAuthConfig(configMapName); } catch (Exception e) { - log.error("Error caching authentication data for {} {}/{}/{}", + log.error("Error create or update authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e); diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java index 06ecb67a6..771ee0b97 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java @@ -337,7 +337,7 @@ private void upsertSource(final String tenant, v1alpha1Source.getSpec().getPulsar().setAuthConfig(KubernetesUtils.getConfigMapName( type, sourceConfig.getTenant(), sourceConfig.getNamespace(), sourceName)); } catch (Exception e) { - log.error("Error creating authentication data for {} {}/{}/{}", + log.error("Error create or update authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e); From e3fcd96fe763ffb8e210e1c7ee5f92f2a9535a4d Mon Sep 17 00:00:00 2001 From: guangning Date: Wed, 19 May 2021 21:28:48 +0800 Subject: [PATCH 4/8] Fixed function mesh configuration file --- mesh-worker-service/src/main/resources/functions_worker.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mesh-worker-service/src/main/resources/functions_worker.yml b/mesh-worker-service/src/main/resources/functions_worker.yml index a454a923c..45bc188b4 100644 --- a/mesh-worker-service/src/main/resources/functions_worker.yml +++ b/mesh-worker-service/src/main/resources/functions_worker.yml @@ -254,4 +254,4 @@ functionsWorkerServiceCustomConfigs: secret: defaultMode: 420 secretName: pulsarcluster-data - authConfigMap: function-mesh-configmap-auth-public-default-testd + authConfigMap: function-mesh-configmap-auth-testd-data From 06c8ad3bfe00e2e6abdf6eeb50500bb0a921d30b Mon Sep 17 00:00:00 2001 From: guangning Date: Wed, 19 May 2021 22:21:45 +0800 Subject: [PATCH 5/8] Fixed max replicas number --- .../main/java/io/functionmesh/compute/util/FunctionsUtil.java | 1 - .../src/main/java/io/functionmesh/compute/util/SinksUtil.java | 1 - .../src/main/java/io/functionmesh/compute/util/SourcesUtil.java | 1 - 3 files changed, 3 deletions(-) diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java index 63cabbb75..0f3326a73 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java @@ -200,7 +200,6 @@ public static V1alpha1Function createV1alpha1FunctionFromFunctionConfig(String k v1alpha1FunctionSpec.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests()); v1alpha1FunctionSpec.setReplicas(functionDetails.getParallelism()); - v1alpha1FunctionSpec.setMaxReplicas(functionDetails.getParallelism()); v1alpha1FunctionSpec.setLogTopic(functionConfig.getLogTopic()); diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java index 12fc93ce4..720785e17 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java @@ -73,7 +73,6 @@ public static V1alpha1Sink createV1alpha1SkinFromSinkConfig(String kind, String Integer parallelism = sinkConfig.getParallelism() == null ? 1 : sinkConfig.getParallelism(); v1alpha1SinkSpec.setReplicas(parallelism); - v1alpha1SinkSpec.setMaxReplicas(parallelism); String customRuntimeOptionsJSON = sinkConfig.getCustomRuntimeOptions(); CustomRuntimeOptions customRuntimeOptions = null; diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java index c2b0c38d7..22f782dbb 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java @@ -172,7 +172,6 @@ public static V1alpha1Source createV1alpha1SourceFromSourceConfig(String kind, S } v1alpha1SourceSpec.setReplicas(functionDetails.getParallelism()); - v1alpha1SourceSpec.setMaxReplicas(functionDetails.getParallelism()); double cpu = sourceConfig.getResources() != null && sourceConfig.getResources().getCpu() != 0 ? sourceConfig.getResources().getCpu() : 1; long ramRequest = sourceConfig.getResources() != null && sourceConfig.getResources().getRam() != 0 ? sourceConfig.getResources().getRam() : 1073741824; From 94bb27684d24753f99cc6dad61ce7fd1511dd710 Mon Sep 17 00:00:00 2001 From: guangning Date: Fri, 21 May 2021 09:44:06 +0800 Subject: [PATCH 6/8] Update comment --- .../java/io/functionmesh/compute/rest/api/FunctionsImpl.java | 4 ++-- .../java/io/functionmesh/compute/rest/api/SinksImpl.java | 5 +++-- .../java/io/functionmesh/compute/rest/api/SourcesImpl.java | 5 +++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java index 80849530c..60359a207 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java @@ -317,12 +317,12 @@ private void upsertFunction(final String tenant, worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig()); v1alpha1Function.getSpec().getPulsar().setTlsSecret(tlsSecretName); } catch (Exception e) { - log.error("Error create or update authentication data for {} {}/{}/{}", + log.error("Error create or update auth or tls secret for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e); throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, - String.format("Error caching authentication data for %s %s:- %s", + String.format("Error ccreate or update auth or tls secret for %s %s:- %s", ComponentTypeUtils.toString(componentType), functionName, e.getMessage())); } } diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java index 366f8f680..672349900 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java @@ -362,11 +362,12 @@ private void upsertSink(final String tenant, worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig()); v1alpha1Sink.getSpec().getPulsar().setTlsSecret(tlsSecretName); } catch (Exception e) { - log.error("Error create or update authentication data for {} {}/{}/{}", + log.error("Error create or update auth or tls secret data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e); - throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", + throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, + String.format("Error create or update auth or tls secret for %s %s:- %s", ComponentTypeUtils.toString(componentType), sinkName, e.getMessage())); } } diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java index d1ae89584..edeefe7ba 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java @@ -332,11 +332,12 @@ private void upsertSource(final String tenant, worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig()); v1alpha1Source.getSpec().getPulsar().setTlsSecret(tlsSecretName); } catch (Exception e) { - log.error("Error create or update authentication data for {} {}/{}/{}", + log.error("Error create or update auth or tls secret for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e); - throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", + throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, + String.format("Error create or update auth or tls secret %s %s:- %s", ComponentTypeUtils.toString(componentType), sourceName, e.getMessage())); } } From 0220dd3199b56d409affaac662e461e55eb93c86 Mon Sep 17 00:00:00 2001 From: guangning Date: Fri, 21 May 2021 09:52:53 +0800 Subject: [PATCH 7/8] Fixed comment --- .../java/io/functionmesh/compute/rest/api/FunctionsImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java index 60359a207..cc4d1adf4 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java @@ -322,7 +322,7 @@ private void upsertFunction(final String tenant, throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, - String.format("Error ccreate or update auth or tls secret for %s %s:- %s", + String.format("Error create or update auth or tls secret for %s %s:- %s", ComponentTypeUtils.toString(componentType), functionName, e.getMessage())); } } From 7101970c4dec5763a2bc27dac5422d07e670ca9c Mon Sep 17 00:00:00 2001 From: guangning Date: Fri, 21 May 2021 09:53:51 +0800 Subject: [PATCH 8/8] Delete auth config map --- mesh-worker-service/src/main/resources/functions_worker.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/mesh-worker-service/src/main/resources/functions_worker.yml b/mesh-worker-service/src/main/resources/functions_worker.yml index 45bc188b4..a19bfe613 100644 --- a/mesh-worker-service/src/main/resources/functions_worker.yml +++ b/mesh-worker-service/src/main/resources/functions_worker.yml @@ -254,4 +254,3 @@ functionsWorkerServiceCustomConfigs: secret: defaultMode: 420 secretName: pulsarcluster-data - authConfigMap: function-mesh-configmap-auth-testd-data