From 633484b117b036d224a2f4ae136ecb8ab7dcfef7 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 20 Jul 2021 18:04:54 +0800 Subject: [PATCH 01/13] add `Config` and move function/connector config to `Config` --- api/v1alpha1/common.go | 49 +++++++++++++++++++ api/v1alpha1/function_types.go | 22 +++++---- api/v1alpha1/sink_types.go | 21 ++++---- api/v1alpha1/source_types.go | 21 ++++---- api/v1alpha1/zz_generated.deepcopy.go | 25 +++++----- ...ompute.functionmesh.io_functionmeshes.yaml | 9 ++-- .../compute.functionmesh.io_functions.yaml | 3 +- .../bases/compute.functionmesh.io_sinks.yaml | 3 +- .../compute.functionmesh.io_sources.yaml | 3 +- manifests/crd.yaml | 18 +++---- 10 files changed, 110 insertions(+), 64 deletions(-) diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index d51403985..5f76a17ab 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -18,6 +18,8 @@ package v1alpha1 import ( + "encoding/json" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -251,3 +253,50 @@ const ( SourceComponent string = "source" SinkComponent string = "sink" ) + +// Config represents untyped YAML configuration. +// +kubebuilder:validation:Type=object +type Config struct { + // Data holds the configuration keys and values. + // This field exists to work around https://github.com/kubernetes-sigs/kubebuilder/issues/528 + Data map[string]interface{} `json:"-"` +} + +// NewConfig constructs a Config with the given unstructured configuration data. +func NewConfig(cfg map[string]interface{}) Config { + return Config{Data: cfg} +} + +// MarshalJSON implements the Marshaler interface. +func (c *Config) MarshalJSON() ([]byte, error) { + return json.Marshal(c.Data) +} + +// UnmarshalJSON implements the Unmarshaler interface. +func (c *Config) UnmarshalJSON(data []byte) error { + var out map[string]interface{} + err := json.Unmarshal(data, &out) + if err != nil { + return err + } + c.Data = out + return nil +} + +// DeepCopyInto is an ~autogenerated~ deepcopy function, copying the receiver, writing into out. in must be non-nil. +// This exists here to work around https://github.com/kubernetes/code-generator/issues/50 +func (c *Config) DeepCopyInto(out *Config) { + bytes, err := json.Marshal(c.Data) + if err != nil { + // we assume that it marshals cleanly because otherwise the resource would not have been + // created in the API server + panic(err) + } + var clone map[string]interface{} + err = json.Unmarshal(bytes, &clone) + if err != nil { + // we assume again optimistically because we just marshalled that the round trip works as well + panic(err) + } + out.Data = clone +} diff --git a/api/v1alpha1/function_types.go b/api/v1alpha1/function_types.go index 8f6d5372b..c5fb89375 100644 --- a/api/v1alpha1/function_types.go +++ b/api/v1alpha1/function_types.go @@ -29,16 +29,18 @@ import ( type FunctionSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - Name string `json:"name,omitempty"` - ClassName string `json:"className,omitempty"` - Tenant string `json:"tenant,omitempty"` - ClusterName string `json:"clusterName,omitempty"` - Replicas *int32 `json:"replicas,omitempty"` - MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling - Input InputConf `json:"input,omitempty"` - Output OutputConf `json:"output,omitempty"` - LogTopic string `json:"logTopic,omitempty"` - FuncConfig map[string]string `json:"funcConfig,omitempty"` + Name string `json:"name,omitempty"` + ClassName string `json:"className,omitempty"` + Tenant string `json:"tenant,omitempty"` + ClusterName string `json:"clusterName,omitempty"` + Replicas *int32 `json:"replicas,omitempty"` + MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling + Input InputConf `json:"input,omitempty"` + Output OutputConf `json:"output,omitempty"` + LogTopic string `json:"logTopic,omitempty"` + // +kubebuilder:validation:Optional + // +kubebuilder:pruning:PreserveUnknownFields + FuncConfig *Config `json:"funcConfig,omitempty"` Resources corev1.ResourceRequirements `json:"resources,omitempty"` SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` diff --git a/api/v1alpha1/sink_types.go b/api/v1alpha1/sink_types.go index 8dbcf26cd..a29a5f1b8 100644 --- a/api/v1alpha1/sink_types.go +++ b/api/v1alpha1/sink_types.go @@ -29,15 +29,18 @@ import ( type SinkSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - Name string `json:"name,omitempty"` - ClassName string `json:"className,omitempty"` - ClusterName string `json:"clusterName,omitempty"` - Tenant string `json:"tenant,omitempty"` - SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector - Replicas *int32 `json:"replicas,omitempty"` - MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling - Input InputConf `json:"input,omitempty"` - SinkConfig map[string]string `json:"sinkConfig,omitempty"` + Name string `json:"name,omitempty"` + ClassName string `json:"className,omitempty"` + ClusterName string `json:"clusterName,omitempty"` + Tenant string `json:"tenant,omitempty"` + SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector + Replicas *int32 `json:"replicas,omitempty"` + MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling + Input InputConf `json:"input,omitempty"` + + // +kubebuilder:validation:Optional + // +kubebuilder:pruning:PreserveUnknownFields + SinkConfig *Config `json:"sinkConfig,omitempty"` Resources corev1.ResourceRequirements `json:"resources,omitempty"` SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` diff --git a/api/v1alpha1/source_types.go b/api/v1alpha1/source_types.go index 1a577eb2d..ed14cb5f5 100644 --- a/api/v1alpha1/source_types.go +++ b/api/v1alpha1/source_types.go @@ -29,15 +29,18 @@ import ( type SourceSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - Name string `json:"name,omitempty"` - ClassName string `json:"className,omitempty"` - Tenant string `json:"tenant,omitempty"` - ClusterName string `json:"clusterName,omitempty"` - SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector - Replicas *int32 `json:"replicas,omitempty"` - MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling - Output OutputConf `json:"output,omitempty"` - SourceConfig map[string]string `json:"sourceConfig,omitempty"` + Name string `json:"name,omitempty"` + ClassName string `json:"className,omitempty"` + Tenant string `json:"tenant,omitempty"` + ClusterName string `json:"clusterName,omitempty"` + SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector + Replicas *int32 `json:"replicas,omitempty"` + MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling + Output OutputConf `json:"output,omitempty"` + + // +kubebuilder:validation:Optional + // +kubebuilder:pruning:PreserveUnknownFields + SourceConfig *Config `json:"sourceConfig,omitempty"` Resources corev1.ResourceRequirements `json:"resources,omitempty"` SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 0ef02279d..cfb070faf 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -25,6 +25,16 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Config. +func (in *Config) DeepCopy() *Config { + if in == nil { + return nil + } + out := new(Config) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ConsumerConfig) DeepCopyInto(out *ConsumerConfig) { *out = *in @@ -318,10 +328,7 @@ func (in *FunctionSpec) DeepCopyInto(out *FunctionSpec) { in.Output.DeepCopyInto(&out.Output) if in.FuncConfig != nil { in, out := &in.FuncConfig, &out.FuncConfig - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } + *out = (*in).DeepCopy() } in.Resources.DeepCopyInto(&out.Resources) if in.SecretsMap != nil { @@ -772,10 +779,7 @@ func (in *SinkSpec) DeepCopyInto(out *SinkSpec) { in.Input.DeepCopyInto(&out.Input) if in.SinkConfig != nil { in, out := &in.SinkConfig, &out.SinkConfig - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } + *out = (*in).DeepCopy() } in.Resources.DeepCopyInto(&out.Resources) if in.SecretsMap != nil { @@ -909,10 +913,7 @@ func (in *SourceSpec) DeepCopyInto(out *SourceSpec) { in.Output.DeepCopyInto(&out.Output) if in.SourceConfig != nil { in, out := &in.SourceConfig, &out.SourceConfig - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } + *out = (*in).DeepCopy() } in.Resources.DeepCopyInto(&out.Resources) if in.SecretsMap != nil { diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index 5bbe95638..8952a3331 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -44,9 +44,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: @@ -4409,9 +4408,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: @@ -6559,9 +6557,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 1a650681f..bf21dd5c9 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -45,9 +45,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index 7c0b89ca1..652705200 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -2170,9 +2170,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index 4b85ec769..5b336b952 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -2143,9 +2143,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: diff --git a/manifests/crd.yaml b/manifests/crd.yaml index cca2ce451..88aac7718 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -42,9 +42,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: @@ -4407,9 +4406,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: @@ -6557,9 +6555,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: @@ -6681,9 +6678,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: @@ -11110,9 +11106,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: @@ -13324,9 +13319,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: From 09ce08d63c048898355627746c16273bcf84352c Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 21 Jul 2021 17:39:25 +0800 Subject: [PATCH 02/13] update mesh worker service & tools --- api/v1alpha1/common.go | 1 - ...ompute.functionmesh.io_functionmeshes.yaml | 9 ++--- .../compute.functionmesh.io_functions.yaml | 3 +- .../crds/compute.functionmesh.io_sinks.yaml | 3 +- .../crds/compute.functionmesh.io_sources.yaml | 3 +- controllers/spec/common.go | 5 ++- controllers/test_utils_test.go | 38 ++++++++++--------- .../functionmesh/compute/util/SinksUtil.java | 6 ++- .../compute/util/SourcesUtil.java | 5 ++- tools/migration.go | 5 ++- 10 files changed, 40 insertions(+), 38 deletions(-) diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index 5f76a17ab..da9a4aa04 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -255,7 +255,6 @@ const ( ) // Config represents untyped YAML configuration. -// +kubebuilder:validation:Type=object type Config struct { // Data holds the configuration keys and values. // This field exists to work around https://github.com/kubernetes-sigs/kubebuilder/issues/528 diff --git a/charts/function-mesh-operator/crds/compute.functionmesh.io_functionmeshes.yaml b/charts/function-mesh-operator/crds/compute.functionmesh.io_functionmeshes.yaml index 5bb712921..5f561a5df 100644 --- a/charts/function-mesh-operator/crds/compute.functionmesh.io_functionmeshes.yaml +++ b/charts/function-mesh-operator/crds/compute.functionmesh.io_functionmeshes.yaml @@ -44,9 +44,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: @@ -4411,9 +4410,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: @@ -6563,9 +6561,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: diff --git a/charts/function-mesh-operator/crds/compute.functionmesh.io_functions.yaml b/charts/function-mesh-operator/crds/compute.functionmesh.io_functions.yaml index 1a650681f..bf21dd5c9 100644 --- a/charts/function-mesh-operator/crds/compute.functionmesh.io_functions.yaml +++ b/charts/function-mesh-operator/crds/compute.functionmesh.io_functions.yaml @@ -45,9 +45,8 @@ spec: forwardSourceMessageProperty: type: boolean funcConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true golang: properties: go: diff --git a/charts/function-mesh-operator/crds/compute.functionmesh.io_sinks.yaml b/charts/function-mesh-operator/crds/compute.functionmesh.io_sinks.yaml index 8661d19fc..bf7915149 100644 --- a/charts/function-mesh-operator/crds/compute.functionmesh.io_sinks.yaml +++ b/charts/function-mesh-operator/crds/compute.functionmesh.io_sinks.yaml @@ -2172,9 +2172,8 @@ spec: type: object type: object sinkConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sinkType: type: string subscriptionName: diff --git a/charts/function-mesh-operator/crds/compute.functionmesh.io_sources.yaml b/charts/function-mesh-operator/crds/compute.functionmesh.io_sources.yaml index cef8b21d3..109615848 100644 --- a/charts/function-mesh-operator/crds/compute.functionmesh.io_sources.yaml +++ b/charts/function-mesh-operator/crds/compute.functionmesh.io_sources.yaml @@ -2145,9 +2145,8 @@ spec: type: object type: object sourceConfig: - additionalProperties: - type: string type: object + x-kubernetes-preserve-unknown-fields: true sourceType: type: string tenant: diff --git a/controllers/spec/common.go b/controllers/spec/common.go index d221885e6..d3f766086 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -423,7 +423,10 @@ func generateResource(resources corev1.ResourceList) *proto.Resources { } } -func getUserConfig(configs map[string]string) string { +func getUserConfig(configs *v1alpha1.Config) string { + if configs == nil { + return "" + } // validated in admission web hook bytes, _ := json.Marshal(configs) return string(bytes) diff --git a/controllers/test_utils_test.go b/controllers/test_utils_test.go index ac23c6fd3..e19266b80 100644 --- a/controllers/test_utils_test.go +++ b/controllers/test_utils_test.go @@ -177,6 +177,13 @@ func makeSinkSample() *v1alpha1.Sink { replicas := int32(1) maxReplicas := int32(1) trueVal := true + sinkConfig := v1alpha1.NewConfig(map[string]interface{}{ + "elasticSearchUrl": "http://quickstart-es-http.default.svc.cluster.local:9200", + "indexName": "my_index", + "typeName": "doc", + "username": "elastic", + "password": "wJ757TmoXEd941kXm07Z2GW3", + }) return &v1alpha1.Sink{ TypeMeta: metav1.TypeMeta{ Kind: "Sink", @@ -194,13 +201,7 @@ func makeSinkSample() *v1alpha1.Sink { }, TypeClassName: "[B", }, - SinkConfig: map[string]string{ - "elasticSearchUrl": "http://quickstart-es-http.default.svc.cluster.local:9200", - "indexName": "my_index", - "typeName": "doc", - "username": "elastic", - "password": "wJ757TmoXEd941kXm07Z2GW3", - }, + SinkConfig: &sinkConfig, Timeout: 0, MaxMessageRetry: 0, Replicas: &replicas, @@ -225,6 +226,15 @@ func makeSinkSample() *v1alpha1.Sink { func makeSourceSample() *v1alpha1.Source { replicas := int32(1) maxReplicas := int32(1) + sourceConfig := v1alpha1.NewConfig(map[string]interface{}{ + "mongodb.hosts": "rs0/mongo-dbz-0.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-1.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-2.mongo.default.svc.cluster.local:27017", + "mongodb.name": "dbserver1", + "mongodb.user": "debezium", + "mongodb.password": "dbz", + "mongodb.task.id": "1", + "database.whitelist": "inventory", + "pulsar.service.url": "pulsar://test-pulsar-broker.default.svc.cluster.local:6650", + }) return &v1alpha1.Source{ TypeMeta: metav1.TypeMeta{ Kind: "Sink", @@ -245,17 +255,9 @@ func makeSourceSample() *v1alpha1.Source { UseThreadLocalProducers: true, }, }, - SourceConfig: map[string]string{ - "mongodb.hosts": "rs0/mongo-dbz-0.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-1.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-2.mongo.default.svc.cluster.local:27017", - "mongodb.name": "dbserver1", - "mongodb.user": "debezium", - "mongodb.password": "dbz", - "mongodb.task.id": "1", - "database.whitelist": "inventory", - "pulsar.service.url": "pulsar://test-pulsar-broker.default.svc.cluster.local:6650", - }, - Replicas: &replicas, - MaxReplicas: &maxReplicas, + SourceConfig: &sourceConfig, + Replicas: &replicas, + MaxReplicas: &maxReplicas, Messaging: v1alpha1.Messaging{ Pulsar: &v1alpha1.PulsarMessaging{ PulsarConfig: TestClusterName, diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java index 4c04821cc..8324b57b3 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java @@ -19,6 +19,7 @@ package io.functionmesh.compute.util; import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import io.functionmesh.compute.models.CustomRuntimeOptions; import io.functionmesh.compute.models.FunctionMeshConnectorDefinition; import io.functionmesh.compute.sinks.models.V1alpha1Sink; @@ -260,7 +261,8 @@ public static V1alpha1Sink createV1alpha1SkinFromSinkConfig(String kind, String v1alpha1SinkSpec.setClusterName(clusterName); - v1alpha1SinkSpec.setSinkConfig(CommonUtil.transformedMapValueToString(sinkConfig.getConfigs())); +// String sinkConfigJson = new Gson().toJson(sinkConfig.getConfigs()); + v1alpha1SinkSpec.setSinkConfig(sinkConfig.getConfigs()); v1alpha1Sink.setSpec(v1alpha1SinkSpec); @@ -376,7 +378,7 @@ public static SinkConfig createSinkConfigFromV1alpha1Sink( } sinkConfig.setClassName(v1alpha1SinkSpec.getClassName()); if (v1alpha1SinkSpec.getSinkConfig() != null) { - sinkConfig.setConfigs(new HashMap<>(v1alpha1SinkSpec.getSinkConfig())); + sinkConfig.setConfigs((Map) v1alpha1SinkSpec.getSinkConfig()); } // TODO: secretsMap diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java index bda1e57bb..6944f3de5 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java @@ -19,6 +19,7 @@ package io.functionmesh.compute.util; import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import io.functionmesh.compute.models.CustomRuntimeOptions; import io.functionmesh.compute.models.FunctionMeshConnectorDefinition; import io.functionmesh.compute.sources.models.V1alpha1Source; @@ -213,7 +214,7 @@ public static V1alpha1Source createV1alpha1SourceFromSourceConfig(String kind, S v1alpha1SourceSpec.setClusterName(clusterName); - v1alpha1SourceSpec.setSourceConfig(CommonUtil.transformedMapValueToString(sourceConfig.getConfigs())); + v1alpha1SourceSpec.setSourceConfig(sourceConfig.getConfigs()); v1alpha1Source.setSpec(v1alpha1SourceSpec); @@ -284,7 +285,7 @@ public static SourceConfig createSourceConfigFromV1alpha1Source(String tenant, S } if (v1alpha1SourceSpec.getSourceConfig() != null) { - sourceConfig.setConfigs(new HashMap<>(v1alpha1SourceSpec.getSourceConfig())); + sourceConfig.setConfigs((Map) v1alpha1SourceSpec.getSourceConfig()); } // TODO: secretsMap diff --git a/tools/migration.go b/tools/migration.go index 95a8d9e46..e2235389a 100755 --- a/tools/migration.go +++ b/tools/migration.go @@ -100,12 +100,13 @@ func main() { topics = append(topics, k) } } - funcConfig := make(map[string]string) + funcConfig := make(map[string]interface{}) for key, value := range functionConfig.UserConfig { strKey := fmt.Sprintf("%v", key) strValue := fmt.Sprintf("%v", value) funcConfig[strKey] = strValue } + funcConfigData := v1alpha1.NewConfig(funcConfig) maxMessageRetry := int32(0) if functionConfig.MaxMessageRetries != nil { maxMessageRetry = int32(*functionConfig.MaxMessageRetries) @@ -150,7 +151,7 @@ func main() { corev1.ResourceMemory: resource.MustParse(fmt.Sprintf("%d", functionConfig.Resources.RAM/1024/1024/1024)), }, }, - FuncConfig: funcConfig, + FuncConfig: &funcConfigData, MaxMessageRetry: maxMessageRetry, } if functionConfig.ProcessingGuarantees != "" { From 95caa0605f8f169265a94bba393779fb75c35eb0 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 21 Jul 2021 17:55:04 +0800 Subject: [PATCH 03/13] cleanup --- .../io/functionmesh/compute/util/CommonUtil.java | 15 --------------- .../io/functionmesh/compute/util/SinksUtil.java | 4 ---- .../io/functionmesh/compute/util/SourcesUtil.java | 3 --- 3 files changed, 22 deletions(-) diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java index b6f32581e..0a374f833 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java @@ -42,7 +42,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static io.functionmesh.compute.util.KubernetesUtils.GRPC_TIMEOUT_SECS; @@ -136,20 +135,6 @@ public static FunctionConfig.ProcessingGuarantees convertProcessingGuarantee(Str return null; } - public static Map transformedMapValueToString(Map map) { - if (map == null) { - return null; - } - return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> String.valueOf(e.getValue()))); - } - - public static Map transformedMapValueToObject(Map map) { - if (map == null) { - return null; - } - return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - // Return a CustomRuntimeOption if a json string is provided, otherwise an empty object is returned public static CustomRuntimeOptions getCustomRuntimeOptions(String customRuntimeOptionsJSON) { CustomRuntimeOptions customRuntimeOptions; diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java index 8324b57b3..f6105920c 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java @@ -19,7 +19,6 @@ package io.functionmesh.compute.util; import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import io.functionmesh.compute.models.CustomRuntimeOptions; import io.functionmesh.compute.models.FunctionMeshConnectorDefinition; import io.functionmesh.compute.sinks.models.V1alpha1Sink; @@ -127,8 +126,6 @@ public static V1alpha1Sink createV1alpha1SkinFromSinkConfig(String kind, String extractedSinkDetails.setSinkClassName(sinkConfig.getClassName()); } - - V1alpha1SinkSpecInput v1alpha1SinkSpecInput = new V1alpha1SinkSpecInput(); for (Map.Entry inputSpecs : functionDetails.getSource().getInputSpecsMap().entrySet()) { @@ -261,7 +258,6 @@ public static V1alpha1Sink createV1alpha1SkinFromSinkConfig(String kind, String v1alpha1SinkSpec.setClusterName(clusterName); -// String sinkConfigJson = new Gson().toJson(sinkConfig.getConfigs()); v1alpha1SinkSpec.setSinkConfig(sinkConfig.getConfigs()); v1alpha1Sink.setSpec(v1alpha1SinkSpec); diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java index 6944f3de5..fb29ed09e 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java @@ -19,7 +19,6 @@ package io.functionmesh.compute.util; import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import io.functionmesh.compute.models.CustomRuntimeOptions; import io.functionmesh.compute.models.FunctionMeshConnectorDefinition; import io.functionmesh.compute.sources.models.V1alpha1Source; @@ -305,7 +304,6 @@ public static SourceConfig createSourceConfigFromV1alpha1Source(String tenant, S sourceConfig.setRuntimeFlags(v1alpha1SourceSpec.getRuntimeFlags()); } - if (v1alpha1SourceSpec.getJava() != null && Strings.isNotEmpty(v1alpha1SourceSpec.getJava().getJar())) { sourceConfig.setArchive(v1alpha1SourceSpec.getJava().getJar()); } @@ -367,7 +365,6 @@ public static void convertFunctionStatusToInstanceStatusData(InstanceCommunicati } instanceStatusData.setLatestSystemExceptions(systemExceptionInformationList); - instanceStatusData.setNumWritten(functionStatus.getNumSuccessfullyProcessed()); instanceStatusData.setLastReceivedTime(functionStatus.getLastInvocationTime()); } From e954c111069f33d663857fa94471ddedbc0ed5e8 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 21 Jul 2021 20:02:51 +0800 Subject: [PATCH 04/13] fix CI --- .ci/ct.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.ci/ct.yaml b/.ci/ct.yaml index 08b35e9da..cc5491ab7 100644 --- a/.ci/ct.yaml +++ b/.ci/ct.yaml @@ -3,3 +3,4 @@ chart-dirs: - charts all: true validate-maintainers: false +helm-extra-args: --timeout 800s From b0a411961e20f12bf6d32bfa61e98845bcfdfc75 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 22 Jul 2021 10:08:36 +0800 Subject: [PATCH 05/13] fix CI --- .ci/ct.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.ci/ct.yaml b/.ci/ct.yaml index cc5491ab7..a5bb2d5b9 100644 --- a/.ci/ct.yaml +++ b/.ci/ct.yaml @@ -1,6 +1,7 @@ check-version-increment: false chart-dirs: - charts -all: true +all: false validate-maintainers: false helm-extra-args: --timeout 800s +namespace: default From e852fa262f65761df0b4a0b40d536ecd6c9a32ef Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 22 Jul 2021 10:18:15 +0800 Subject: [PATCH 06/13] fix CI --- .ci/ct.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.ci/ct.yaml b/.ci/ct.yaml index a5bb2d5b9..718699768 100644 --- a/.ci/ct.yaml +++ b/.ci/ct.yaml @@ -5,3 +5,4 @@ all: false validate-maintainers: false helm-extra-args: --timeout 800s namespace: default +release-label: release From ac5aa8f175da231517c38443d2ae0cd0dae6ddf3 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 22 Jul 2021 10:53:13 +0800 Subject: [PATCH 07/13] fix CI --- .github/workflows/test-helm-charts.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/test-helm-charts.yml b/.github/workflows/test-helm-charts.yml index ac0bbce8d..02e7bd9cb 100644 --- a/.github/workflows/test-helm-charts.yml +++ b/.github/workflows/test-helm-charts.yml @@ -64,6 +64,8 @@ jobs: - name: Create kind cluster uses: helm/kind-action@v1.1.0 if: steps.list-changed.outputs.changed == 'true' + with: + node_image: kindest/node:v1.15.12 - name: Set up GO 1.13 if: steps.list-changed.outputs.changed == 'true' From e7ec67c8cff38f7807661a1fea0a981026f06ddb Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 22 Jul 2021 11:50:08 +0800 Subject: [PATCH 08/13] fix CI --- .ci/ct.yaml | 2 +- .github/workflows/test-helm-charts.yml | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.ci/ct.yaml b/.ci/ct.yaml index 718699768..78760974d 100644 --- a/.ci/ct.yaml +++ b/.ci/ct.yaml @@ -3,6 +3,6 @@ chart-dirs: - charts all: false validate-maintainers: false -helm-extra-args: --timeout 800s +helm-extra-args: --timeout 2000s namespace: default release-label: release diff --git a/.github/workflows/test-helm-charts.yml b/.github/workflows/test-helm-charts.yml index 02e7bd9cb..0e44c79e7 100644 --- a/.github/workflows/test-helm-charts.yml +++ b/.github/workflows/test-helm-charts.yml @@ -62,10 +62,11 @@ jobs: if: steps.list-changed.outputs.changed == 'true' - name: Create kind cluster - uses: helm/kind-action@v1.1.0 + uses: helm/kind-action@v1.2.0 if: steps.list-changed.outputs.changed == 'true' with: node_image: kindest/node:v1.15.12 + wait: 600s - name: Set up GO 1.13 if: steps.list-changed.outputs.changed == 'true' From b78380e4ae5ba4d7732be9938f88b1ca308ed80f Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 27 Jul 2021 17:14:17 +0800 Subject: [PATCH 09/13] address reviews --- api/v1alpha1/common.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index da9a4aa04..9c7ac7660 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" ) type Messaging struct { @@ -285,17 +286,5 @@ func (c *Config) UnmarshalJSON(data []byte) error { // DeepCopyInto is an ~autogenerated~ deepcopy function, copying the receiver, writing into out. in must be non-nil. // This exists here to work around https://github.com/kubernetes/code-generator/issues/50 func (c *Config) DeepCopyInto(out *Config) { - bytes, err := json.Marshal(c.Data) - if err != nil { - // we assume that it marshals cleanly because otherwise the resource would not have been - // created in the API server - panic(err) - } - var clone map[string]interface{} - err = json.Unmarshal(bytes, &clone) - if err != nil { - // we assume again optimistically because we just marshalled that the round trip works as well - panic(err) - } - out.Data = clone + out.Data = runtime.DeepCopyJSON(c.Data) } From c282c7f6b77c83fcdfe0e528d2ad2fc0a7a7b15c Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 5 Aug 2021 16:41:40 +0800 Subject: [PATCH 10/13] add verify of empty config --- .ci/helm.sh | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/.ci/helm.sh b/.ci/helm.sh index b410dc67e..0883ccc08 100644 --- a/.ci/helm.sh +++ b/.ci/helm.sh @@ -283,4 +283,27 @@ function ci::verify_mesh_worker_service_pulsar_admin() { RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sinks delete --name data-generator-sink) echo $RET ${KUBECTL} get pods -n ${NAMESPACE} + echo " === verify mesh worker service with empty connector config" + RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources create --name data-generator-source --source-type data-generator --destination-topic-name persistent://public/default/random-data-topic --custom-runtime-options '{"outputTypeClassName": "org.apache.pulsar.io.datagenerator.Person"}') + echo $RET + if [[ $RET != *"successfully"* ]]; then + return 1 + fi + WC=$(${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep "data-generator-source" | wc -l) + while [[ ${WC} -lt 1 ]]; do + echo ${WC}; + sleep 20 + ${KUBECTL} get pods -n ${NAMESPACE} + WC=$(${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep "data-generator-source" | wc -l) + done + ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source + RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source) + if [[ $RET != *"true"* ]]; then + return 1 + fi + ${KUBECTL} get pods -n ${NAMESPACE} + RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources delete --name data-generator-source) + echo $RET + ${KUBECTL} get pods -n ${NAMESPACE} + } From 6fdc66355bbba134e9b4a30eab7b2da1a8a3cf7c Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 5 Aug 2021 17:24:10 +0800 Subject: [PATCH 11/13] show more logs --- .ci/helm.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.ci/helm.sh b/.ci/helm.sh index 0883ccc08..7e69ac18f 100644 --- a/.ci/helm.sh +++ b/.ci/helm.sh @@ -299,6 +299,8 @@ function ci::verify_mesh_worker_service_pulsar_admin() { ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source) if [[ $RET != *"true"* ]]; then + ${KUBECTL} logs -n ${NAMESPACE} data-generator-source-69865103-source-0 + ${KUBECTL} get pods data-generator-source-69865103-source-0 -o yaml return 1 fi ${KUBECTL} get pods -n ${NAMESPACE} From 139c57480b4bdd9bf96f9bd8afc9cbc0fd4cd1b3 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 5 Aug 2021 23:08:47 +0800 Subject: [PATCH 12/13] return empty user config --- controllers/spec/common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controllers/spec/common.go b/controllers/spec/common.go index d3f766086..458e18a9a 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -425,7 +425,7 @@ func generateResource(resources corev1.ResourceList) *proto.Resources { func getUserConfig(configs *v1alpha1.Config) string { if configs == nil { - return "" + return "{}" } // validated in admission web hook bytes, _ := json.Marshal(configs) From a887eacf91696e40e46e61615e77b67f7efad585 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 6 Aug 2021 08:13:26 +0800 Subject: [PATCH 13/13] fix CI --- controllers/spec/common.go | 2 +- controllers/spec/utils.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 458e18a9a..d3f766086 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -425,7 +425,7 @@ func generateResource(resources corev1.ResourceList) *proto.Resources { func getUserConfig(configs *v1alpha1.Config) string { if configs == nil { - return "{}" + return "" } // validated in admission web hook bytes, _ := json.Marshal(configs) diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index 8af323ad1..c518e2222 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -222,10 +222,10 @@ func convertSourceDetails(source *v1alpha1.Source) *proto.FunctionDetails { } func generateSourceInputSpec(source *v1alpha1.Source) *proto.SourceSpec { - configs, _ := json.Marshal(source.Spec.SourceConfig) + configs := getUserConfig(source.Spec.SourceConfig) return &proto.SourceSpec{ ClassName: source.Spec.ClassName, - Configs: string(configs), // TODO handle batch source + Configs: configs, // TODO handle batch source TypeClassName: source.Spec.Output.TypeClassName, } } @@ -298,10 +298,10 @@ func getSubscriptionType(retainOrdering bool, processingGuarantee v1alpha1.Proce } func generateSinkOutputSpec(sink *v1alpha1.Sink) *proto.SinkSpec { - configs, _ := json.Marshal(sink.Spec.SinkConfig) + configs := getUserConfig(sink.Spec.SinkConfig) return &proto.SinkSpec{ ClassName: sink.Spec.ClassName, - Configs: string(configs), + Configs: configs, TypeClassName: sink.Spec.Input.TypeClassName, } }