diff --git a/.ci/ct.yaml b/.ci/ct.yaml index 08b35e9da..78760974d 100644 --- a/.ci/ct.yaml +++ b/.ci/ct.yaml @@ -1,5 +1,8 @@ check-version-increment: false chart-dirs: - charts -all: true +all: false validate-maintainers: false +helm-extra-args: --timeout 2000s +namespace: default +release-label: release diff --git a/.ci/helm.sh b/.ci/helm.sh index b410dc67e..7e69ac18f 100644 --- a/.ci/helm.sh +++ b/.ci/helm.sh @@ -283,4 +283,29 @@ function ci::verify_mesh_worker_service_pulsar_admin() { RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sinks delete --name data-generator-sink) echo $RET ${KUBECTL} get pods -n ${NAMESPACE} + echo " === verify mesh worker service with empty connector config" + RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources create --name data-generator-source --source-type data-generator --destination-topic-name persistent://public/default/random-data-topic --custom-runtime-options '{"outputTypeClassName": "org.apache.pulsar.io.datagenerator.Person"}') + echo $RET + if [[ $RET != *"successfully"* ]]; then + return 1 + fi + WC=$(${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep "data-generator-source" | wc -l) + while [[ ${WC} -lt 1 ]]; do + echo ${WC}; + sleep 20 + ${KUBECTL} get pods -n ${NAMESPACE} + WC=$(${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep "data-generator-source" | wc -l) + done + ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source + RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source) + if [[ $RET != *"true"* ]]; then + ${KUBECTL} logs -n ${NAMESPACE} data-generator-source-69865103-source-0 + ${KUBECTL} get pods data-generator-source-69865103-source-0 -o yaml + return 1 + fi + ${KUBECTL} get pods -n ${NAMESPACE} + RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources delete --name data-generator-source) + echo $RET + ${KUBECTL} get pods -n ${NAMESPACE} + } diff --git a/.github/workflows/test-helm-charts.yml b/.github/workflows/test-helm-charts.yml index ac0bbce8d..0e44c79e7 100644 --- a/.github/workflows/test-helm-charts.yml +++ b/.github/workflows/test-helm-charts.yml @@ -62,8 +62,11 @@ jobs: if: steps.list-changed.outputs.changed == 'true' - name: Create kind cluster - uses: helm/kind-action@v1.1.0 + uses: helm/kind-action@v1.2.0 if: steps.list-changed.outputs.changed == 'true' + with: + node_image: kindest/node:v1.15.12 + wait: 600s - name: Set up GO 1.13 if: steps.list-changed.outputs.changed == 'true' diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index d51403985..9c7ac7660 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -18,8 +18,11 @@ package v1alpha1 import ( + "encoding/json" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" ) type Messaging struct { @@ -251,3 +254,37 @@ const ( SourceComponent string = "source" SinkComponent string = "sink" ) + +// Config represents untyped YAML configuration. +type Config struct { + // Data holds the configuration keys and values. + // This field exists to work around https://github.com/kubernetes-sigs/kubebuilder/issues/528 + Data map[string]interface{} `json:"-"` +} + +// NewConfig constructs a Config with the given unstructured configuration data. +func NewConfig(cfg map[string]interface{}) Config { + return Config{Data: cfg} +} + +// MarshalJSON implements the Marshaler interface. +func (c *Config) MarshalJSON() ([]byte, error) { + return json.Marshal(c.Data) +} + +// UnmarshalJSON implements the Unmarshaler interface. +func (c *Config) UnmarshalJSON(data []byte) error { + var out map[string]interface{} + err := json.Unmarshal(data, &out) + if err != nil { + return err + } + c.Data = out + return nil +} + +// DeepCopyInto is an ~autogenerated~ deepcopy function, copying the receiver, writing into out. in must be non-nil. +// This exists here to work around https://github.com/kubernetes/code-generator/issues/50 +func (c *Config) DeepCopyInto(out *Config) { + out.Data = runtime.DeepCopyJSON(c.Data) +} diff --git a/api/v1alpha1/function_types.go b/api/v1alpha1/function_types.go index 8f6d5372b..c5fb89375 100644 --- a/api/v1alpha1/function_types.go +++ b/api/v1alpha1/function_types.go @@ -29,16 +29,18 @@ import ( type FunctionSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - Name string `json:"name,omitempty"` - ClassName string `json:"className,omitempty"` - Tenant string `json:"tenant,omitempty"` - ClusterName string `json:"clusterName,omitempty"` - Replicas *int32 `json:"replicas,omitempty"` - MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling - Input InputConf `json:"input,omitempty"` - Output OutputConf `json:"output,omitempty"` - LogTopic string `json:"logTopic,omitempty"` - FuncConfig map[string]string `json:"funcConfig,omitempty"` + Name string `json:"name,omitempty"` + ClassName string `json:"className,omitempty"` + Tenant string `json:"tenant,omitempty"` + ClusterName string `json:"clusterName,omitempty"` + Replicas *int32 `json:"replicas,omitempty"` + MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling + Input InputConf `json:"input,omitempty"` + Output OutputConf `json:"output,omitempty"` + LogTopic string `json:"logTopic,omitempty"` + // +kubebuilder:validation:Optional + // +kubebuilder:pruning:PreserveUnknownFields + FuncConfig *Config `json:"funcConfig,omitempty"` Resources corev1.ResourceRequirements `json:"resources,omitempty"` SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` diff --git a/api/v1alpha1/sink_types.go b/api/v1alpha1/sink_types.go index 8dbcf26cd..a29a5f1b8 100644 --- a/api/v1alpha1/sink_types.go +++ b/api/v1alpha1/sink_types.go @@ -29,15 +29,18 @@ import ( type SinkSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - Name string `json:"name,omitempty"` - ClassName string `json:"className,omitempty"` - ClusterName string `json:"clusterName,omitempty"` - Tenant string `json:"tenant,omitempty"` - SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector - Replicas *int32 `json:"replicas,omitempty"` - MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling - Input InputConf `json:"input,omitempty"` - SinkConfig map[string]string `json:"sinkConfig,omitempty"` + Name string `json:"name,omitempty"` + ClassName string `json:"className,omitempty"` + ClusterName string `json:"clusterName,omitempty"` + Tenant string `json:"tenant,omitempty"` + SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector + Replicas *int32 `json:"replicas,omitempty"` + MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling + Input InputConf `json:"input,omitempty"` + + // +kubebuilder:validation:Optional + // +kubebuilder:pruning:PreserveUnknownFields + SinkConfig *Config `json:"sinkConfig,omitempty"` Resources corev1.ResourceRequirements `json:"resources,omitempty"` SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` diff --git a/api/v1alpha1/source_types.go b/api/v1alpha1/source_types.go index 1a577eb2d..ed14cb5f5 100644 --- a/api/v1alpha1/source_types.go +++ b/api/v1alpha1/source_types.go @@ -29,15 +29,18 @@ import ( type SourceSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - Name string `json:"name,omitempty"` - ClassName string `json:"className,omitempty"` - Tenant string `json:"tenant,omitempty"` - ClusterName string `json:"clusterName,omitempty"` - SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector - Replicas *int32 `json:"replicas,omitempty"` - MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling - Output OutputConf `json:"output,omitempty"` - SourceConfig map[string]string `json:"sourceConfig,omitempty"` + Name string `json:"name,omitempty"` + ClassName string `json:"className,omitempty"` + Tenant string `json:"tenant,omitempty"` + ClusterName string `json:"clusterName,omitempty"` + SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector + Replicas *int32 `json:"replicas,omitempty"` + MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling + Output OutputConf `json:"output,omitempty"` + + // +kubebuilder:validation:Optional + // +kubebuilder:pruning:PreserveUnknownFields + SourceConfig *Config `json:"sourceConfig,omitempty"` Resources corev1.ResourceRequirements `json:"resources,omitempty"` SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 0ef02279d..cfb070faf 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -25,6 +25,16 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Config. +func (in *Config) DeepCopy() *Config { + if in == nil { + return nil + } + out := new(Config) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ConsumerConfig) DeepCopyInto(out *ConsumerConfig) { *out = *in @@ -318,10 +328,7 @@ func (in *FunctionSpec) DeepCopyInto(out *FunctionSpec) { in.Output.DeepCopyInto(&out.Output) if in.FuncConfig != nil { in, out := &in.FuncConfig, &out.FuncConfig - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } + *out = (*in).DeepCopy() } in.Resources.DeepCopyInto(&out.Resources) if in.SecretsMap != nil { @@ -772,10 +779,7 @@ func (in *SinkSpec) DeepCopyInto(out *SinkSpec) { in.Input.DeepCopyInto(&out.Input) if in.SinkConfig != nil { in, out := &in.SinkConfig, &out.SinkConfig - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } + *out = (*in).DeepCopy() } in.Resources.DeepCopyInto(&out.Resources) if in.SecretsMap != nil { @@ -909,10 +913,7 @@ func (in *SourceSpec) DeepCopyInto(out *SourceSpec) { in.Output.DeepCopyInto(&out.Output) if in.SourceConfig != nil { in, out := &in.SourceConfig, &out.SourceConfig - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } + *out = (*in).DeepCopy() } in.Resources.DeepCopyInto(&out.Resources) if in.SecretsMap != nil { diff --git a/charts/function-mesh-operator/crds/compute.functionmesh.io_functionmeshes.yaml b/charts/function-mesh-operator/crds/compute.functionmesh.io_functionmeshes.yaml index 5bb712921..5f561a5df 100644 --- a/charts/function-mesh-operator/crds/compute.functionmesh.io_functionmeshes.yaml +++ b/charts/function-mesh-operator/crds/compute.functionmesh.io_functionmeshes.yaml @@ -44,9 +44,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: @@ -4411,9 +4410,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: @@ -6563,9 +6561,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: diff --git a/charts/function-mesh-operator/crds/compute.functionmesh.io_functions.yaml b/charts/function-mesh-operator/crds/compute.functionmesh.io_functions.yaml index 1a650681f..bf21dd5c9 100644 --- a/charts/function-mesh-operator/crds/compute.functionmesh.io_functions.yaml +++ b/charts/function-mesh-operator/crds/compute.functionmesh.io_functions.yaml @@ -45,9 +45,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: diff --git a/charts/function-mesh-operator/crds/compute.functionmesh.io_sinks.yaml b/charts/function-mesh-operator/crds/compute.functionmesh.io_sinks.yaml index 8661d19fc..bf7915149 100644 --- a/charts/function-mesh-operator/crds/compute.functionmesh.io_sinks.yaml +++ b/charts/function-mesh-operator/crds/compute.functionmesh.io_sinks.yaml @@ -2172,9 +2172,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: diff --git a/charts/function-mesh-operator/crds/compute.functionmesh.io_sources.yaml b/charts/function-mesh-operator/crds/compute.functionmesh.io_sources.yaml index cef8b21d3..109615848 100644 --- a/charts/function-mesh-operator/crds/compute.functionmesh.io_sources.yaml +++ b/charts/function-mesh-operator/crds/compute.functionmesh.io_sources.yaml @@ -2145,9 +2145,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index 5bbe95638..8952a3331 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -44,9 +44,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: @@ -4409,9 +4408,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: @@ -6559,9 +6557,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 1a650681f..bf21dd5c9 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -45,9 +45,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index 7c0b89ca1..652705200 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -2170,9 +2170,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index 4b85ec769..5b336b952 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -2143,9 +2143,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: diff --git a/controllers/spec/common.go b/controllers/spec/common.go index d221885e6..d3f766086 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -423,7 +423,10 @@ func generateResource(resources corev1.ResourceList) *proto.Resources { } } -func getUserConfig(configs map[string]string) string { +func getUserConfig(configs *v1alpha1.Config) string { + if configs == nil { + return "" + } // validated in admission web hook bytes, _ := json.Marshal(configs) return string(bytes) diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index 8af323ad1..c518e2222 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -222,10 +222,10 @@ func convertSourceDetails(source *v1alpha1.Source) *proto.FunctionDetails { } func generateSourceInputSpec(source *v1alpha1.Source) *proto.SourceSpec { - configs, _ := json.Marshal(source.Spec.SourceConfig) + configs := getUserConfig(source.Spec.SourceConfig) return &proto.SourceSpec{ ClassName: source.Spec.ClassName, - Configs: string(configs), // TODO handle batch source + Configs: configs, // TODO handle batch source TypeClassName: source.Spec.Output.TypeClassName, } } @@ -298,10 +298,10 @@ func getSubscriptionType(retainOrdering bool, processingGuarantee v1alpha1.Proce } func generateSinkOutputSpec(sink *v1alpha1.Sink) *proto.SinkSpec { - configs, _ := json.Marshal(sink.Spec.SinkConfig) + configs := getUserConfig(sink.Spec.SinkConfig) return &proto.SinkSpec{ ClassName: sink.Spec.ClassName, - Configs: string(configs), + Configs: configs, TypeClassName: sink.Spec.Input.TypeClassName, } } diff --git a/controllers/test_utils_test.go b/controllers/test_utils_test.go index ac23c6fd3..e19266b80 100644 --- a/controllers/test_utils_test.go +++ b/controllers/test_utils_test.go @@ -177,6 +177,13 @@ func makeSinkSample() *v1alpha1.Sink { replicas := int32(1) maxReplicas := int32(1) trueVal := true + sinkConfig := v1alpha1.NewConfig(map[string]interface{}{ + "elasticSearchUrl": "http://quickstart-es-http.default.svc.cluster.local:9200", + "indexName": "my_index", + "typeName": "doc", + "username": "elastic", + "password": "wJ757TmoXEd941kXm07Z2GW3", + }) return &v1alpha1.Sink{ TypeMeta: metav1.TypeMeta{ Kind: "Sink", @@ -194,13 +201,7 @@ func makeSinkSample() *v1alpha1.Sink { }, TypeClassName: "[B", }, - SinkConfig: map[string]string{ - "elasticSearchUrl": "http://quickstart-es-http.default.svc.cluster.local:9200", - "indexName": "my_index", - "typeName": "doc", - "username": "elastic", - "password": "wJ757TmoXEd941kXm07Z2GW3", - }, + SinkConfig: &sinkConfig, Timeout: 0, MaxMessageRetry: 0, Replicas: &replicas, @@ -225,6 +226,15 @@ func makeSinkSample() *v1alpha1.Sink { func makeSourceSample() *v1alpha1.Source { replicas := int32(1) maxReplicas := int32(1) + sourceConfig := v1alpha1.NewConfig(map[string]interface{}{ + "mongodb.hosts": "rs0/mongo-dbz-0.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-1.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-2.mongo.default.svc.cluster.local:27017", + "mongodb.name": "dbserver1", + "mongodb.user": "debezium", + "mongodb.password": "dbz", + "mongodb.task.id": "1", + "database.whitelist": "inventory", + "pulsar.service.url": "pulsar://test-pulsar-broker.default.svc.cluster.local:6650", + }) return &v1alpha1.Source{ TypeMeta: metav1.TypeMeta{ Kind: "Sink", @@ -245,17 +255,9 @@ func makeSourceSample() *v1alpha1.Source { UseThreadLocalProducers: true, }, }, - SourceConfig: map[string]string{ - "mongodb.hosts": "rs0/mongo-dbz-0.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-1.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-2.mongo.default.svc.cluster.local:27017", - "mongodb.name": "dbserver1", - "mongodb.user": "debezium", - "mongodb.password": "dbz", - "mongodb.task.id": "1", - "database.whitelist": "inventory", - "pulsar.service.url": "pulsar://test-pulsar-broker.default.svc.cluster.local:6650", - }, - Replicas: &replicas, - MaxReplicas: &maxReplicas, + SourceConfig: &sourceConfig, + Replicas: &replicas, + MaxReplicas: &maxReplicas, Messaging: v1alpha1.Messaging{ Pulsar: &v1alpha1.PulsarMessaging{ PulsarConfig: TestClusterName, diff --git a/manifests/crd.yaml b/manifests/crd.yaml index cca2ce451..88aac7718 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -42,9 +42,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: @@ -4407,9 +4406,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: @@ -6557,9 +6555,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: @@ -6681,9 +6678,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: @@ -11110,9 +11106,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: @@ -13324,9 +13319,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java index b6f32581e..0a374f833 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java @@ -42,7 +42,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static io.functionmesh.compute.util.KubernetesUtils.GRPC_TIMEOUT_SECS; @@ -136,20 +135,6 @@ public static FunctionConfig.ProcessingGuarantees convertProcessingGuarantee(Str return null; } - public static Map transformedMapValueToString(Map map) { - if (map == null) { - return null; - } - return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> String.valueOf(e.getValue()))); - } - - public static Map transformedMapValueToObject(Map map) { - if (map == null) { - return null; - } - return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - // Return a CustomRuntimeOption if a json string is provided, otherwise an empty object is returned public static CustomRuntimeOptions getCustomRuntimeOptions(String customRuntimeOptionsJSON) { CustomRuntimeOptions customRuntimeOptions; diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java index 4c04821cc..f6105920c 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java @@ -126,8 +126,6 @@ public static V1alpha1Sink createV1alpha1SkinFromSinkConfig(String kind, String extractedSinkDetails.setSinkClassName(sinkConfig.getClassName()); } - - V1alpha1SinkSpecInput v1alpha1SinkSpecInput = new V1alpha1SinkSpecInput(); for (Map.Entry inputSpecs : functionDetails.getSource().getInputSpecsMap().entrySet()) { @@ -260,7 +258,7 @@ public static V1alpha1Sink createV1alpha1SkinFromSinkConfig(String kind, String v1alpha1SinkSpec.setClusterName(clusterName); - v1alpha1SinkSpec.setSinkConfig(CommonUtil.transformedMapValueToString(sinkConfig.getConfigs())); + v1alpha1SinkSpec.setSinkConfig(sinkConfig.getConfigs()); v1alpha1Sink.setSpec(v1alpha1SinkSpec); @@ -376,7 +374,7 @@ public static SinkConfig createSinkConfigFromV1alpha1Sink( } sinkConfig.setClassName(v1alpha1SinkSpec.getClassName()); if (v1alpha1SinkSpec.getSinkConfig() != null) { - sinkConfig.setConfigs(new HashMap<>(v1alpha1SinkSpec.getSinkConfig())); + sinkConfig.setConfigs((Map) v1alpha1SinkSpec.getSinkConfig()); } // TODO: secretsMap diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java index bda1e57bb..fb29ed09e 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java @@ -213,7 +213,7 @@ public static V1alpha1Source createV1alpha1SourceFromSourceConfig(String kind, S v1alpha1SourceSpec.setClusterName(clusterName); - v1alpha1SourceSpec.setSourceConfig(CommonUtil.transformedMapValueToString(sourceConfig.getConfigs())); + v1alpha1SourceSpec.setSourceConfig(sourceConfig.getConfigs()); v1alpha1Source.setSpec(v1alpha1SourceSpec); @@ -284,7 +284,7 @@ public static SourceConfig createSourceConfigFromV1alpha1Source(String tenant, S } if (v1alpha1SourceSpec.getSourceConfig() != null) { - sourceConfig.setConfigs(new HashMap<>(v1alpha1SourceSpec.getSourceConfig())); + sourceConfig.setConfigs((Map) v1alpha1SourceSpec.getSourceConfig()); } // TODO: secretsMap @@ -304,7 +304,6 @@ public static SourceConfig createSourceConfigFromV1alpha1Source(String tenant, S sourceConfig.setRuntimeFlags(v1alpha1SourceSpec.getRuntimeFlags()); } - if (v1alpha1SourceSpec.getJava() != null && Strings.isNotEmpty(v1alpha1SourceSpec.getJava().getJar())) { sourceConfig.setArchive(v1alpha1SourceSpec.getJava().getJar()); } @@ -366,7 +365,6 @@ public static void convertFunctionStatusToInstanceStatusData(InstanceCommunicati } instanceStatusData.setLatestSystemExceptions(systemExceptionInformationList); - instanceStatusData.setNumWritten(functionStatus.getNumSuccessfullyProcessed()); instanceStatusData.setLastReceivedTime(functionStatus.getLastInvocationTime()); } diff --git a/tools/migration.go b/tools/migration.go index 95a8d9e46..e2235389a 100755 --- a/tools/migration.go +++ b/tools/migration.go @@ -100,12 +100,13 @@ func main() { topics = append(topics, k) } } - funcConfig := make(map[string]string) + funcConfig := make(map[string]interface{}) for key, value := range functionConfig.UserConfig { strKey := fmt.Sprintf("%v", key) strValue := fmt.Sprintf("%v", value) funcConfig[strKey] = strValue } + funcConfigData := v1alpha1.NewConfig(funcConfig) maxMessageRetry := int32(0) if functionConfig.MaxMessageRetries != nil { maxMessageRetry = int32(*functionConfig.MaxMessageRetries) @@ -150,7 +151,7 @@ func main() { corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%d", functionConfig.Resources.RAM/1024/1024/1024)), }, }, - FuncConfig: funcConfig, + FuncConfig: &funcConfigData, MaxMessageRetry: maxMessageRetry, } if functionConfig.ProcessingGuarantees != "" {