From 1ad1a4854e61067a28301c4407767821fd152ee7 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 23 Jul 2021 09:56:58 +0800 Subject: [PATCH 01/15] add openapi validation to CRD --- api/v1alpha1/common.go | 10 +++ api/v1alpha1/function_types.go | 16 ++++- api/v1alpha1/sink_types.go | 27 +++++--- api/v1alpha1/source_types.go | 12 +++- ...ompute.functionmesh.io_functionmeshes.yaml | 33 ++++++++++ .../compute.functionmesh.io_functions.yaml | 13 ++++ .../bases/compute.functionmesh.io_sinks.yaml | 12 ++++ .../compute.functionmesh.io_sources.yaml | 8 +++ manifests/crd.yaml | 66 +++++++++++++++++++ 9 files changed, 183 insertions(+), 14 deletions(-) diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index d6431423b..129c83a41 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -122,18 +122,24 @@ type Runtime struct { Golang *GoRuntime `json:"golang,omitempty"` } +// +kubebuilder:validation:Optional type JavaRuntime struct { + // +kubebuilder:validation:Required Jar string `json:"jar,omitempty"` JarLocation string `json:"jarLocation,omitempty"` ExtraDependenciesDir string `json:"extraDependenciesDir,omitempty"` } +// +kubebuilder:validation:Optional type PythonRuntime struct { + // +kubebuilder:validation:Required Py string `json:"py,omitempty"` PyLocation string `json:"pyLocation,omitempty"` } +// +kubebuilder:validation:Optional type GoRuntime struct { + // +kubebuilder:validation:Required Go string `json:"go,omitempty"` GoLocation string `json:"goLocation,omitempty"` } @@ -144,6 +150,7 @@ type SecretRef struct { } type InputConf struct { + // +kubebuilder:default="[B" TypeClassName string `json:"typeClassName,omitempty"` Topics []string `json:"topics,omitempty"` TopicPattern string `json:"topicPattern,omitempty"` @@ -163,6 +170,7 @@ type ConsumerConfig struct { } type OutputConf struct { + // +kubebuilder:default="[B" TypeClassName string `json:"typeClassName,omitempty"` Topic string `json:"topic,omitempty"` SinkSerdeClassName string `json:"sinkSerdeClassName,omitempty"` @@ -195,6 +203,7 @@ type CryptoSecret struct { //AsEnv string `json:"asEnv,omitempty"` } +// +kubebuilder:validation:Enum=latest;earliest type SubscribePosition string const ( @@ -239,6 +248,7 @@ const ( NoAction ReconcileAction = "NoAction" ) +// +kubebuilder:validation:Enum=atleast_once;atmost_once;effectively_once type ProcessGuarantee string const ( diff --git a/api/v1alpha1/function_types.go b/api/v1alpha1/function_types.go index 6cf8e0b77..52ea00070 100644 --- a/api/v1alpha1/function_types.go +++ b/api/v1alpha1/function_types.go @@ -26,15 +26,20 @@ import ( // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // FunctionSpec defines the desired state of Function +// +kubebuilder:validation:Optional type FunctionSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file + // +kubebuilder:validation:Required Name string `json:"name,omitempty"` ClassName string `json:"className,omitempty"` Tenant string `json:"tenant,omitempty"` Namespace string `json:"namespace,omitempty"` ClusterName string `json:"clusterName,omitempty"` - Replicas *int32 `json:"replicas,omitempty"` + // +kubebuilder:validation:Required + // +kubebuilder:default=1 + // +kubebuilder:validation:Minimum=1 + Replicas *int32 `json:"replicas,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. @@ -50,7 +55,9 @@ type FunctionSpec struct { SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` - Timeout int32 `json:"timeout,omitempty"` + Timeout int32 `json:"timeout,omitempty"` + // +kubebuilder:default=true + // +kubebuilder:validation:Required AutoAck *bool `json:"autoAck,omitempty"` MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"` ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` @@ -69,8 +76,11 @@ type FunctionSpec struct { // TODO: windowconfig, customRuntimeOptions? + // +kubebuilder:validation:Required Messaging `json:",inline"` - Runtime `json:",inline"` + + // +kubebuilder:validation:Required + Runtime `json:",inline"` // Image is the container image used to run function pods. // default is streamnative/pulsar-functions-java-runner diff --git a/api/v1alpha1/sink_types.go b/api/v1alpha1/sink_types.go index 9e75bdb12..1631cbe4f 100644 --- a/api/v1alpha1/sink_types.go +++ b/api/v1alpha1/sink_types.go @@ -26,16 +26,21 @@ import ( // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // SinkSpec defines the desired state of Topic +// +kubebuilder:validation:Optional type SinkSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file + // +kubebuilder:validation:Required Name string `json:"name,omitempty"` ClassName string `json:"className,omitempty"` ClusterName string `json:"clusterName,omitempty"` Tenant string `json:"tenant,omitempty"` Namespace string `json:"namespace,omitempty"` SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector - Replicas *int32 `json:"replicas,omitempty"` + // +kubebuilder:validation:Required + // +kubebuilder:default=1 + // +kubebuilder:validation:Minimum=1 + Replicas *int32 `json:"replicas,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. @@ -50,13 +55,15 @@ type SinkSpec struct { SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` - Timeout int32 `json:"timeout,omitempty"` - NegativeAckRedeliveryDelayMs int32 `json:"negativeAckRedeliveryDelayMs,omitempty"` - AutoAck *bool `json:"autoAck,omitempty"` - MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"` - ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` - RetainOrdering bool `json:"retainOrdering,omitempty"` - DeadLetterTopic string `json:"deadLetterTopic,omitempty"` + Timeout int32 `json:"timeout,omitempty"` + NegativeAckRedeliveryDelayMs int32 `json:"negativeAckRedeliveryDelayMs,omitempty"` + // +kubebuilder:default=true + // +kubebuilder:validation:Required + AutoAck *bool `json:"autoAck,omitempty"` + MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"` + ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` + RetainOrdering bool `json:"retainOrdering,omitempty"` + DeadLetterTopic string `json:"deadLetterTopic,omitempty"` RuntimeFlags string `json:"runtimeFlags,omitempty"` SubscriptionName string `json:"subscriptionName,omitempty"` @@ -65,8 +72,10 @@ type SinkSpec struct { Pod PodPolicy `json:"pod,omitempty"` + // +kubebuilder:validation:Required Messaging `json:",inline"` - Runtime `json:",inline"` + // +kubebuilder:validation:Required + Runtime `json:",inline"` // Image is the container image used to run sink pods. // default is streamnative/pulsar-functions-java-runner diff --git a/api/v1alpha1/source_types.go b/api/v1alpha1/source_types.go index 29d656341..3cc54780d 100644 --- a/api/v1alpha1/source_types.go +++ b/api/v1alpha1/source_types.go @@ -26,16 +26,21 @@ import ( // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // SourceSpec defines the desired state of Source +// +kubebuilder:validation:Optional type SourceSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file + // +kubebuilder:validation:Required Name string `json:"name,omitempty"` ClassName string `json:"className,omitempty"` Tenant string `json:"tenant,omitempty"` Namespace string `json:"namespace,omitempty"` ClusterName string `json:"clusterName,omitempty"` SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector - Replicas *int32 `json:"replicas,omitempty"` + // +kubebuilder:validation:Required + // +kubebuilder:default=1 + // +kubebuilder:validation:Minimum=1 + Replicas *int32 `json:"replicas,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. @@ -54,8 +59,11 @@ type SourceSpec struct { ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"` Pod PodPolicy `json:"pod,omitempty"` + // +kubebuilder:validation:Required Messaging `json:",inline"` - Runtime `json:",inline"` + + // +kubebuilder:validation:Required + Runtime `json:",inline"` // Image is the container image used to run source pods. // default is streamnative/pulsar-functions-java-runner diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index 4d54c48e5..19efe410c 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -32,6 +32,7 @@ spec: items: properties: autoAck: + default: true type: boolean className: type: string @@ -127,6 +128,7 @@ spec: type: string type: array typeClassName: + default: '[B' type: string type: object java: @@ -210,6 +212,7 @@ spec: topic: type: string typeClassName: + default: '[B' type: string type: object pod: @@ -2443,6 +2446,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -2461,7 +2468,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -2492,8 +2501,12 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: + default: public type: string timeout: format: int32 @@ -2524,6 +2537,7 @@ spec: items: properties: autoAck: + default: true type: boolean className: type: string @@ -2614,6 +2628,7 @@ spec: type: string type: array typeClassName: + default: '[B' type: string type: object java: @@ -4869,6 +4884,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -4887,7 +4906,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -4921,8 +4942,12 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: + default: public type: string timeout: format: int32 @@ -5042,6 +5067,7 @@ spec: topic: type: string typeClassName: + default: '[B' type: string type: object pod: @@ -7275,6 +7301,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -7293,7 +7323,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -7323,6 +7355,7 @@ spec: sourceType: type: string tenant: + default: public type: string volumeMounts: items: diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 72a11da17..6d1173d63 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -33,6 +33,7 @@ spec: spec: properties: autoAck: + default: true type: boolean className: type: string @@ -128,6 +129,7 @@ spec: type: string type: array typeClassName: + default: '[B' type: string type: object java: @@ -211,6 +213,7 @@ spec: topic: type: string typeClassName: + default: '[B' type: string type: object pod: @@ -2444,6 +2447,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -2462,7 +2469,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -2493,8 +2502,12 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: + default: public type: string timeout: format: int32 diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index 38edca51d..daf4faf32 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -33,6 +33,7 @@ spec: spec: properties: autoAck: + default: true type: boolean className: type: string @@ -123,6 +124,7 @@ spec: type: string type: array typeClassName: + default: '[B' type: string type: object java: @@ -2378,6 +2380,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -2396,7 +2402,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -2430,8 +2438,12 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: + default: public type: string timeout: format: int32 diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index 9a9e22c3a..1b1e59187 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -122,6 +122,7 @@ spec: topic: type: string typeClassName: + default: '[B' type: string type: object pod: @@ -2355,6 +2356,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -2373,7 +2378,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -2403,6 +2410,7 @@ spec: sourceType: type: string tenant: + default: public type: string volumeMounts: items: diff --git a/manifests/crd.yaml b/manifests/crd.yaml index 6b73066d9..cee7de59a 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -30,6 +30,7 @@ spec: items: properties: autoAck: + default: true type: boolean className: type: string @@ -125,6 +126,7 @@ spec: type: string type: array typeClassName: + default: '[B' type: string type: object java: @@ -208,6 +210,7 @@ spec: topic: type: string typeClassName: + default: '[B' type: string type: object pod: @@ -2441,6 +2444,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -2459,7 +2466,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -2490,8 +2499,12 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: + default: public type: string timeout: format: int32 @@ -2522,6 +2535,7 @@ spec: items: properties: autoAck: + default: true type: boolean className: type: string @@ -2612,6 +2626,7 @@ spec: type: string type: array typeClassName: + default: '[B' type: string type: object java: @@ -4867,6 +4882,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -4885,7 +4904,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -4919,8 +4940,12 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: + default: public type: string timeout: format: int32 @@ -5040,6 +5065,7 @@ spec: topic: type: string typeClassName: + default: '[B' type: string type: object pod: @@ -7273,6 +7299,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -7291,7 +7321,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -7321,6 +7353,7 @@ spec: sourceType: type: string tenant: + default: public type: string volumeMounts: items: @@ -7427,6 +7460,7 @@ spec: spec: properties: autoAck: + default: true type: boolean className: type: string @@ -7522,6 +7556,7 @@ spec: type: string type: array typeClassName: + default: '[B' type: string type: object java: @@ -7605,6 +7640,7 @@ spec: topic: type: string typeClassName: + default: '[B' type: string type: object pod: @@ -9838,6 +9874,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -9856,7 +9896,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -9887,8 +9929,12 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: + default: public type: string timeout: format: int32 @@ -9983,6 +10029,7 @@ spec: spec: properties: autoAck: + default: true type: boolean className: type: string @@ -10073,6 +10120,7 @@ spec: type: string type: array typeClassName: + default: '[B' type: string type: object java: @@ -12328,6 +12376,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -12346,7 +12398,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -12380,8 +12434,12 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: + default: public type: string timeout: format: int32 @@ -12565,6 +12623,7 @@ spec: topic: type: string typeClassName: + default: '[B' type: string type: object pod: @@ -14798,6 +14857,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -14816,7 +14879,9 @@ spec: type: string type: object replicas: + default: 1 format: int32 + minimum: 1 type: integer resources: properties: @@ -14846,6 +14911,7 @@ spec: sourceType: type: string tenant: + default: public type: string volumeMounts: items: From d9305ea5dac797e73d66202e96794552bba75a16 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 23 Jul 2021 16:38:00 +0800 Subject: [PATCH 02/15] validate CRDs --- api/v1alpha1/common.go | 100 ++++++- api/v1alpha1/function_types.go | 2 +- api/v1alpha1/function_webhook.go | 142 ++++++--- api/v1alpha1/sink_types.go | 4 +- api/v1alpha1/sink_webhook.go | 99 +++--- api/v1alpha1/source_types.go | 3 +- api/v1alpha1/source_webhook.go | 81 ++--- api/v1alpha1/validate.go | 283 ++++++++++++++++++ .../templates/controller-manager-service.yaml | 25 ++ charts/function-mesh-operator/values.yaml | 2 + ...ompute.functionmesh.io_functionmeshes.yaml | 13 - .../compute.functionmesh.io_functions.yaml | 5 - .../bases/compute.functionmesh.io_sinks.yaml | 4 - .../compute.functionmesh.io_sources.yaml | 4 - config/crd/kustomization.yaml | 16 +- config/manager/kustomization.yaml | 4 +- hack/webhook-create-signed-cert.sh | 131 ++++++++ hack/webhooks/certs/csr.conf | 13 + hack/webhooks/certs/server.csr | 19 ++ hack/webhooks/certs/tls.crt | 20 ++ hack/webhooks/certs/tls.key | 27 ++ main.go | 4 + manifests/crd.yaml | 62 ++-- 23 files changed, 870 insertions(+), 193 deletions(-) create mode 100644 api/v1alpha1/validate.go create mode 100644 charts/function-mesh-operator/templates/controller-manager-service.yaml create mode 100755 hack/webhook-create-signed-cert.sh create mode 100644 hack/webhooks/certs/csr.conf create mode 100644 hack/webhooks/certs/server.csr create mode 100644 hack/webhooks/certs/tls.crt create mode 100644 hack/webhooks/certs/tls.key diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index 129c83a41..611bb9dc1 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -20,7 +20,12 @@ package v1alpha1 import ( "encoding/json" + "fmt" + "strings" + autov2beta2 "k8s.io/api/autoscaling/v2beta2" + + pctlutil "github.com/streamnative/pulsarctl/pkg/pulsar/utils" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -150,7 +155,6 @@ type SecretRef struct { } type InputConf struct { - // +kubebuilder:default="[B" TypeClassName string `json:"typeClassName,omitempty"` Topics []string `json:"topics,omitempty"` TopicPattern string `json:"topicPattern,omitempty"` @@ -170,7 +174,6 @@ type ConsumerConfig struct { } type OutputConf struct { - // +kubebuilder:default="[B" TypeClassName string `json:"typeClassName,omitempty"` Topic string `json:"topic,omitempty"` SinkSerdeClassName string `json:"sinkSerdeClassName,omitempty"` @@ -259,6 +262,9 @@ const ( DefaultTenant string = "public" DefaultNamespace string = "default" DefaultCluster string = "kubernetes" + + DefaultResourceCpu int64 = 1 + DefaultResourceMemory int64 = 1073741824 ) func validResourceRequirement(requirements corev1.ResourceRequirements) bool { @@ -285,6 +291,13 @@ const ( FunctionComponent string = "function" SourceComponent string = "source" SinkComponent string = "sink" + + PackageUrlHttp string = "http://" + PackageUrlHttps string = "https://" + // PackageUrlFile string = "file://" + PackageUrlFunction string = "function://" + PackageUrlSource string = "source://" + PackageUrlSink string = "sink://" ) // Config represents untyped YAML configuration. @@ -320,3 +333,86 @@ func (c *Config) UnmarshalJSON(data []byte) error { func (c *Config) DeepCopyInto(out *Config) { out.Data = runtime.DeepCopyJSON(c.Data) } + +func validPackageLocation(packageLocation string) error { + if hasPackageTypePrefix(packageLocation) { + err := isValidPulsarPackageURL(packageLocation) + if err != nil { + return err + } + } else { + if !isFunctionPackageUrlSupported(packageLocation) { + return fmt.Errorf("invalid function package url %s, supported url (http/https)", packageLocation) + } + } + + return nil +} + +func hasPackageTypePrefix(packageLocation string) bool { + lowerCase := strings.ToLower(packageLocation) + return strings.HasPrefix(lowerCase, PackageUrlFunction) || + strings.HasPrefix(lowerCase, PackageUrlSource) || + strings.HasPrefix(lowerCase, PackageUrlSink) +} + +func isValidPulsarPackageURL(packageLocation string) error { + parts := strings.Split(packageLocation, "://") + if len(parts) != 2 { + return fmt.Errorf("invalid package name %s", packageLocation) + } + if !hasPackageTypePrefix(packageLocation) { + return fmt.Errorf("invalid package name %s", packageLocation) + } + rest := parts[1] + if !strings.Contains(rest, "@") { + rest += "@" + } + packageParts := strings.Split(rest, "@") + if len(packageParts) != 2 { + return fmt.Errorf("invalid package name %s", packageLocation) + } + partsWithoutVersion := strings.Split(packageParts[0], "/") + if len(partsWithoutVersion) != 3 { + return fmt.Errorf("invalid package name %s", packageLocation) + } + return nil +} + +func isFunctionPackageUrlSupported(packageLocation string) bool { + // TODO: support file:// schema + lowerCase := strings.ToLower(packageLocation) + return strings.HasPrefix(lowerCase, PackageUrlHttp) || + strings.HasPrefix(lowerCase, PackageUrlHttps) +} + +func collectAllInputTopics(inputs InputConf) []string { + ret := []string{} + if len(inputs.Topics) > 0 { + ret = append(ret, inputs.Topics...) + } + if inputs.TopicPattern != "" { + ret = append(ret, inputs.TopicPattern) + } + if len(inputs.CustomSerdeSources) > 0 { + for k := range inputs.CustomSerdeSources { + ret = append(ret, k) + } + } + if len(inputs.CustomSchemaSources) > 0 { + for k := range inputs.CustomSchemaSources { + ret = append(ret, k) + } + } + if len(inputs.SourceSpecs) > 0 { + for k := range inputs.SourceSpecs { + ret = append(ret, k) + } + } + return ret +} + +func isValidTopicName(topicName string) error { + _, err := pctlutil.GetTopicName(topicName) + return err +} diff --git a/api/v1alpha1/function_types.go b/api/v1alpha1/function_types.go index 52ea00070..aebba9d24 100644 --- a/api/v1alpha1/function_types.go +++ b/api/v1alpha1/function_types.go @@ -37,7 +37,6 @@ type FunctionSpec struct { Namespace string `json:"namespace,omitempty"` ClusterName string `json:"clusterName,omitempty"` // +kubebuilder:validation:Required - // +kubebuilder:default=1 // +kubebuilder:validation:Minimum=1 Replicas *int32 `json:"replicas,omitempty"` @@ -104,6 +103,7 @@ type FunctionStatus struct { //+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector // Function is the Schema for the functions API +// +kubebuilder:pruning:PreserveUnknownFields type Function struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/api/v1alpha1/function_webhook.go b/api/v1alpha1/function_webhook.go index 6d6eaabe9..5f9c14e05 100644 --- a/api/v1alpha1/function_webhook.go +++ b/api/v1alpha1/function_webhook.go @@ -18,8 +18,9 @@ package v1alpha1 import ( - "encoding/json" - "errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -47,13 +48,13 @@ func (r *Function) Default() { functionlog.Info("default", "name", r.Name) if r.Spec.Replicas == nil { - zeroVal := int32(0) - r.Spec.Replicas = &zeroVal + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = 1 } if r.Spec.AutoAck == nil { - trueVal := true - r.Spec.AutoAck = &trueVal + r.Spec.AutoAck = new(bool) + *r.Spec.AutoAck = true } if r.Spec.ProcessingGuarantee == "" { @@ -88,17 +89,25 @@ func (r *Function) Default() { if r.Spec.Resources.Requests != nil { if r.Spec.Resources.Requests.Cpu() == nil { - r.Spec.Resources.Requests.Cpu().Set(int64(1)) + r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCpu) } if r.Spec.Resources.Requests.Memory() == nil { - r.Spec.Resources.Requests.Memory().Set(int64(1073741824)) + r.Spec.Resources.Requests.Memory().Set(DefaultResourceMemory) } } if r.Spec.Resources.Limits == nil { paddingResourceLimit(&r.Spec.Resources) } + + if r.Spec.Input.TypeClassName == "" { + r.Spec.Input.TypeClassName = "[B" + } + + if r.Spec.Output.TypeClassName == "" { + r.Spec.Output.TypeClassName = "[B" + } } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. @@ -108,69 +117,112 @@ var _ webhook.Validator = &Function{} // ValidateCreate implements webhook.Validator so a webhook will be registered for the type func (r *Function) ValidateCreate() error { - functionlog.Info("validate create", "name", r.Name) + functionlog.Info("validate create function", "name", r.Name) + var allErrs field.ErrorList + var fieldErr *field.Error + var fieldErrs []*field.Error - if r.Spec.Java != nil { - if r.Spec.ClassName == "" { - return errors.New("class name cannot be empty") - } + if r.Name == "" { + allErrs = append(allErrs, field.Invalid(field.NewPath("name"), r.Name, "function name is not provided")) } - // TODO: verify source conf + if r.Spec.FuncConfig == nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("funcConfig"), r.Spec.FuncConfig, "function config is not provided")) + } - // TODO: allow 0 replicas, currently hpa's min value has to be 1 - if *r.Spec.Replicas == 0 { - return errors.New("replicas cannot be zero") + if r.Spec.Runtime.Java == nil && r.Spec.Runtime.Python == nil && r.Spec.Runtime.Golang == nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "java"), r.Spec.Runtime.Java, "runtime cannot be empty")) + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "python"), r.Spec.Runtime.Python, "runtime cannot be empty")) + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "golang"), r.Spec.Runtime.Golang, "runtime cannot be empty")) } - if r.Spec.MaxReplicas != nil && *r.Spec.Replicas > *r.Spec.MaxReplicas { - return errors.New("maxReplicas must be greater than or equal to replicas") + if (r.Spec.Runtime.Java != nil && r.Spec.Runtime.Python != nil) || + (r.Spec.Runtime.Java != nil && r.Spec.Runtime.Golang != nil) || + (r.Spec.Runtime.Python != nil && r.Spec.Runtime.Golang != nil) || + (r.Spec.Runtime.Java != nil && r.Spec.Runtime.Python != nil && r.Spec.Runtime.Golang != nil) { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime"), r.Spec.Runtime, "you can only specify one runtime")) } - if !validResourceRequirement(r.Spec.Resources) { - return errors.New("resource requirement is invalid") + fieldErrs = validateJavaRuntime(r.Spec.Java, r.Spec.ClassName) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.Timeout != 0 && r.Spec.ProcessingGuarantee != AtleastOnce { - return errors.New("message timeout can only be set for AtleastOnce processing guarantee") + fieldErrs = validatePythonRuntime(r.Spec.Python, r.Spec.ClassName) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.MaxMessageRetry > 0 && r.Spec.ProcessingGuarantee == EffectivelyOnce { - return errors.New("MaxMessageRetries and Effectively once are not compatible") + fieldErrs = validateGolangRuntime(r.Spec.Golang) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.MaxMessageRetry <= 0 && r.Spec.DeadLetterTopic != "" { - return errors.New("dead letter topic is set but max message retry is set to infinity") + fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.RetainKeyOrdering && r.Spec.ProcessingGuarantee == EffectivelyOnce { - return errors.New("when effectively once processing guarantee is specified, retain Key ordering cannot be set") + fieldErr = validateResourceRequirement(r.Spec.Resources) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.RetainKeyOrdering && r.Spec.RetainOrdering { - return errors.New("only one of retain ordering or retain key ordering can be set") + fieldErr = validateAutoAck(r.Spec.AutoAck) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.Java == nil && r.Spec.Python == nil && r.Spec.Golang == nil { - return errors.New("must specify a runtime from java, python or golang") + fieldErr = validateTimeout(r.Spec.Timeout, r.Spec.ProcessingGuarantee) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.FuncConfig != nil { - _, err := json.Marshal(r.Spec.FuncConfig) - if err != nil { - return errors.New("provided config is wrong: " + err.Error()) - } + fieldErrs = validateMaxMessageRetry(r.Spec.MaxMessageRetry, r.Spec.ProcessingGuarantee, r.Spec.DeadLetterTopic) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.SecretsMap != nil { - _, err := json.Marshal(r.Spec.SecretsMap) - if err != nil { - return errors.New("provided secrets map is wrong: " + err.Error()) - } + fieldErr = validateRetainKeyOrdering(r.Spec.RetainKeyOrdering, r.Spec.ProcessingGuarantee) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - // TODO python/golang specific check - return nil + fieldErrs = validateRetainOrderingConflicts(r.Spec.RetainKeyOrdering, r.Spec.RetainOrdering) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) + } + + fieldErr = validateFunctionConfig(r.Spec.FuncConfig) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } + + fieldErr = validateSecretsMap(r.Spec.SecretsMap) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } + + fieldErrs = validateInputOutput(&r.Spec.Input, &r.Spec.Output) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) + } + + fieldErr = validateLogTopic(r.Spec.LogTopic) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } + + fieldErr = validateDeadLetterTopic(r.Spec.DeadLetterTopic) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } + + if len(allErrs) == 0 { + return nil + } + + return apierrors.NewInvalid(schema.GroupKind{Group: "compute.functionmesh.io", Kind: "Function"}, r.Name, allErrs) } // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type diff --git a/api/v1alpha1/sink_types.go b/api/v1alpha1/sink_types.go index 1631cbe4f..7bbda13ce 100644 --- a/api/v1alpha1/sink_types.go +++ b/api/v1alpha1/sink_types.go @@ -38,7 +38,6 @@ type SinkSpec struct { Namespace string `json:"namespace,omitempty"` SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector // +kubebuilder:validation:Required - // +kubebuilder:default=1 // +kubebuilder:validation:Minimum=1 Replicas *int32 `json:"replicas,omitempty"` @@ -98,7 +97,8 @@ type SinkStatus struct { // +kubebuilder:subresource:status //+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector -// Topic is the Schema for the sinks API +// Sink is the Schema for the sinks API +// +kubebuilder:pruning:PreserveUnknownFields type Sink struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/api/v1alpha1/sink_webhook.go b/api/v1alpha1/sink_webhook.go index 94e6c4374..178464d95 100644 --- a/api/v1alpha1/sink_webhook.go +++ b/api/v1alpha1/sink_webhook.go @@ -18,8 +18,9 @@ package v1alpha1 import ( - "encoding/json" - "errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -47,13 +48,13 @@ func (r *Sink) Default() { sinklog.Info("default", "name", r.Name) if r.Spec.Replicas == nil { - zeroVal := int32(0) - r.Spec.Replicas = &zeroVal + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = 1 } if r.Spec.AutoAck == nil { - trueVal := true - r.Spec.AutoAck = &trueVal + r.Spec.AutoAck = new(bool) + *r.Spec.AutoAck = true } if r.Spec.ProcessingGuarantee == "" { @@ -78,17 +79,21 @@ func (r *Sink) Default() { if r.Spec.Resources.Requests != nil { if r.Spec.Resources.Requests.Cpu() == nil { - r.Spec.Resources.Requests.Cpu().Set(int64(1)) + r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCpu) } if r.Spec.Resources.Requests.Memory() == nil { - r.Spec.Resources.Requests.Memory().Set(int64(1073741824)) + r.Spec.Resources.Requests.Memory().Set(DefaultResourceMemory) } } if r.Spec.Resources.Limits == nil { paddingResourceLimit(&r.Spec.Resources) } + + if r.Spec.Input.TypeClassName == "" { + r.Spec.Input.TypeClassName = "[B" + } } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. @@ -98,62 +103,74 @@ var _ webhook.Validator = &Sink{} // ValidateCreate implements webhook.Validator so a webhook will be registered for the type func (r *Sink) ValidateCreate() error { - sinklog.Info("validate create", "name", r.Name) + sinklog.Info("validate create sink", "name", r.Name) + var allErrs field.ErrorList + var fieldErr *field.Error + var fieldErrs []*field.Error - if r.Spec.Java != nil { - if r.Spec.ClassName == "" { - return errors.New("class name cannot be empty") - } + if r.Spec.SinkConfig == nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("sinkConfig"), r.Spec.SinkConfig, "sink config is not provided")) } - // TODO: verify inputConf + if r.Spec.Runtime.Java == nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "java"), r.Spec.Runtime.Java, "sink must have java runtime specified")) + } - // TODO: allow 0 replicas, currently hpa's min value has to be 1 - if *r.Spec.Replicas == 0 { - return errors.New("replicas cannot be zero") + fieldErrs = validateJavaRuntime(r.Spec.Java, r.Spec.ClassName) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.MaxReplicas != nil && *r.Spec.Replicas > *r.Spec.MaxReplicas { - return errors.New("maxReplicas must be greater than or equal to replicas") + fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if !validResourceRequirement(r.Spec.Resources) { - return errors.New("resource requirement is invalid") + fieldErr = validateResourceRequirement(r.Spec.Resources) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.Timeout != 0 && r.Spec.ProcessingGuarantee != AtleastOnce { - return errors.New("message timeout can only be set for AtleastOnce processing guarantee") + fieldErr = validateAutoAck(r.Spec.AutoAck) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.MaxMessageRetry > 0 && r.Spec.ProcessingGuarantee == EffectivelyOnce { - return errors.New("MaxMessageRetries and Effectively once are not compatible") + fieldErr = validateTimeout(r.Spec.Timeout, r.Spec.ProcessingGuarantee) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.MaxMessageRetry <= 0 && r.Spec.DeadLetterTopic != "" { - return errors.New("dead letter topic is set but max message retry is set to infinity") + fieldErrs = validateMaxMessageRetry(r.Spec.MaxMessageRetry, r.Spec.ProcessingGuarantee, r.Spec.DeadLetterTopic) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.Java == nil && r.Spec.Python == nil && r.Spec.Golang == nil { - return errors.New("must specify a runtime from java, python or golang") + fieldErr = validateSinkConfig(r.Spec.SinkConfig) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.SinkConfig != nil { - _, err := json.Marshal(r.Spec.SinkConfig) - if err != nil { - return errors.New("provided config is wrong: " + err.Error()) - } + fieldErr = validateSecretsMap(r.Spec.SecretsMap) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.SecretsMap != nil { - _, err := json.Marshal(r.Spec.SecretsMap) - if err != nil { - return errors.New("provided secrets map is wrong: " + err.Error()) - } + fieldErrs = validateInputOutput(&r.Spec.Input, nil) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - // TODO python/golang specific check + fieldErr = validateDeadLetterTopic(r.Spec.DeadLetterTopic) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } - return nil + if len(allErrs) == 0 { + return nil + } + + return apierrors.NewInvalid(schema.GroupKind{Group: "compute.functionmesh.io", Kind: "Sink"}, r.Name, allErrs) } // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type diff --git a/api/v1alpha1/source_types.go b/api/v1alpha1/source_types.go index 3cc54780d..a479edbeb 100644 --- a/api/v1alpha1/source_types.go +++ b/api/v1alpha1/source_types.go @@ -38,8 +38,6 @@ type SourceSpec struct { ClusterName string `json:"clusterName,omitempty"` SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector // +kubebuilder:validation:Required - // +kubebuilder:default=1 - // +kubebuilder:validation:Minimum=1 Replicas *int32 `json:"replicas,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler @@ -87,6 +85,7 @@ type SourceStatus struct { //+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector // Source is the Schema for the sources API +// +kubebuilder:pruning:PreserveUnknownFields type Source struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/api/v1alpha1/source_webhook.go b/api/v1alpha1/source_webhook.go index a9ce5934e..4b41bfaec 100644 --- a/api/v1alpha1/source_webhook.go +++ b/api/v1alpha1/source_webhook.go @@ -18,8 +18,9 @@ package v1alpha1 import ( - "encoding/json" - "errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -47,15 +48,10 @@ func (r *Source) Default() { sourcelog.Info("default", "name", r.Name) if r.Spec.Replicas == nil { - zeroVal := int32(0) - r.Spec.Replicas = &zeroVal + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = 1 } - //if r.Spec.AutoAck == nil { - // trueVal := true - // r.Spec.AutoAck = &trueVal - //} - if r.Spec.ProcessingGuarantee == "" { r.Spec.ProcessingGuarantee = AtleastOnce } @@ -78,11 +74,11 @@ func (r *Source) Default() { if r.Spec.Resources.Requests != nil { if r.Spec.Resources.Requests.Cpu() == nil { - r.Spec.Resources.Requests.Cpu().Set(int64(1)) + r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCpu) } if r.Spec.Resources.Requests.Memory() == nil { - r.Spec.Resources.Requests.Memory().Set(int64(1073741824)) + r.Spec.Resources.Requests.Memory().Set(DefaultResourceMemory) } } @@ -104,6 +100,10 @@ func (r *Source) Default() { if r.Spec.Resources.Limits == nil { paddingResourceLimit(&r.Spec.Resources) } + + if r.Spec.Output.TypeClassName == "" { + r.Spec.Output.TypeClassName = "[B" + } } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. @@ -113,49 +113,54 @@ var _ webhook.Validator = &Source{} // ValidateCreate implements webhook.Validator so a webhook will be registered for the type func (r *Source) ValidateCreate() error { - sourcelog.Info("validate create", "name", r.Name) + sourcelog.Info("validate create source", "name", r.Name) + var allErrs field.ErrorList + var fieldErr *field.Error + var fieldErrs []*field.Error - if r.Spec.Java != nil { - if r.Spec.ClassName == "" { - return errors.New("class name cannot be empty") - } + if r.Spec.SourceConfig == nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("sourceConfig"), r.Spec.SourceConfig, "source config is not provided")) + } + + if r.Spec.Runtime.Java == nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "java"), r.Spec.Runtime.Java, "source must have java runtime specified")) } - // TODO: verify inputConf + fieldErrs = validateJavaRuntime(r.Spec.Java, r.Spec.ClassName) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) + } - // TODO: allow 0 replicas, currently hpa's min value has to be 1 - if *r.Spec.Replicas == 0 { - return errors.New("replicas cannot be zero") + fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.MaxReplicas != nil && *r.Spec.Replicas > *r.Spec.MaxReplicas { - return errors.New("maxReplicas must be greater than or equal to replicas") + fieldErr = validateResourceRequirement(r.Spec.Resources) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if !validResourceRequirement(r.Spec.Resources) { - return errors.New("resource requirement is invalid") + fieldErr = validateSourceConfig(r.Spec.SourceConfig) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.Java == nil && r.Spec.Python == nil && r.Spec.Golang == nil { - return errors.New("must specify a runtime from java, python or golang") + fieldErr = validateSecretsMap(r.Spec.SecretsMap) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.SourceConfig != nil { - _, err := json.Marshal(r.Spec.SourceConfig) - if err != nil { - return errors.New("provided config is wrong: " + err.Error()) - } + fieldErrs = validateInputOutput(nil, &r.Spec.Output) + if fieldErrs != nil && len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.SecretsMap != nil { - _, err := json.Marshal(r.Spec.SecretsMap) - if err != nil { - return errors.New("provided secrets map is wrong: " + err.Error()) - } + if len(allErrs) == 0 { + return nil } - // TODO python/golang specific check - return nil + return apierrors.NewInvalid(schema.GroupKind{Group: "compute.functionmesh.io", Kind: "Source"}, r.Name, allErrs) } // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type diff --git a/api/v1alpha1/validate.go b/api/v1alpha1/validate.go new file mode 100644 index 000000000..98d3c9561 --- /dev/null +++ b/api/v1alpha1/validate.go @@ -0,0 +1,283 @@ +package v1alpha1 + +import ( + "encoding/json" + "fmt" + + corev1 "k8s.io/api/core/v1" + + "k8s.io/apimachinery/pkg/util/validation/field" +) + +func ensureJavaRuntime(java *JavaRuntime, python *PythonRuntime, golang *GoRuntime) []*field.Error { + var allErrs field.ErrorList + + return allErrs +} + +func validateJavaRuntime(java *JavaRuntime, className string) []*field.Error { + var allErrs field.ErrorList + if java != nil { + if className == "" { + e := field.Invalid(field.NewPath("spec").Child("classname"), className, "class name cannot be empty") + allErrs = append(allErrs, e) + } + if java.Jar == "" { + e := field.Invalid(field.NewPath("spec").Child("java", "jar"), java.Jar, "jar cannot be empty in java runtime") + allErrs = append(allErrs, e) + } + if java.JarLocation != "" { + err := validPackageLocation(java.JarLocation) + if err != nil { + e := field.Invalid(field.NewPath("spec").Child("java", "jarLocation"), java.JarLocation, err.Error()) + allErrs = append(allErrs, e) + } + } + } + return allErrs +} + +func validatePythonRuntime(python *PythonRuntime, className string) []*field.Error { + var allErrs field.ErrorList + if python != nil { + if className == "" { + e := field.Invalid(field.NewPath("spec").Child("classname"), className, "class name cannot be empty") + allErrs = append(allErrs, e) + } + if python.Py == "" { + e := field.Invalid(field.NewPath("spec").Child("python", "py"), python.Py, "py cannot be empty in python runtime") + allErrs = append(allErrs, e) + } + if python.PyLocation != "" { + err := validPackageLocation(python.PyLocation) + if err != nil { + e := field.Invalid(field.NewPath("spec").Child("python", "pyLocation"), python.PyLocation, err.Error()) + allErrs = append(allErrs, e) + } + } + } + return allErrs +} + +func validateGolangRuntime(golang *GoRuntime) []*field.Error { + var allErrs field.ErrorList + if golang != nil { + if golang.Go == "" { + e := field.Invalid(field.NewPath("spec").Child("golang", "go"), golang.Go, "go cannot be empty in golang runtime") + allErrs = append(allErrs, e) + } + if golang.GoLocation != "" { + err := validPackageLocation(golang.GoLocation) + if err != nil { + e := field.Invalid(field.NewPath("spec").Child("golang", "goLocation"), golang.GoLocation, err.Error()) + allErrs = append(allErrs, e) + } + } + } + return allErrs +} + +func validateReplicasAndMaxReplicas(replicas, maxReplicas *int32) []*field.Error { + var allErrs field.ErrorList + // TODO: allow 0 replicas, currently hpa's min value has to be 1 + if replicas == nil { + e := field.Invalid(field.NewPath("spec").Child("replicas"), nil, "replicas cannot be nil") + allErrs = append(allErrs, e) + } + + if replicas != nil && *replicas <= 0 { + e := field.Invalid(field.NewPath("spec").Child("replicas"), *replicas, "replicas cannot be zero or negative") + allErrs = append(allErrs, e) + } + + if maxReplicas != nil && replicas != nil && *replicas > *maxReplicas { + e := field.Invalid(field.NewPath("spec").Child("maxReplicas"), *maxReplicas, "maxReplicas must be greater than or equal to replicas") + allErrs = append(allErrs, e) + } + return allErrs +} + +func validateResourceRequirement(requirements corev1.ResourceRequirements) *field.Error { + if !(validResource(requirements.Requests) && validResource(requirements.Limits) && + requirements.Requests.Memory().Cmp(*requirements.Limits.Memory()) <= 0 && + requirements.Requests.Cpu().Cmp(*requirements.Limits.Cpu()) <= 0) { + return field.Invalid(field.NewPath("spec").Child("resources"), requirements, "resource requirement is invalid") + } + return nil +} + +func validateTimeout(timeout int32, processingGuarantee ProcessGuarantee) *field.Error { + if timeout != 0 && processingGuarantee == EffectivelyOnce { + return field.Invalid(field.NewPath("spec").Child("timeout"), timeout, "message timeout can only be set for AtleastOnce processing guarantee") + } + return nil +} + +func validateMaxMessageRetry(maxMessageRetry int32, processingGuarantee ProcessGuarantee, deadLetterTopic string) []*field.Error { + var allErrs field.ErrorList + if maxMessageRetry > 0 && processingGuarantee == EffectivelyOnce { + e := field.Invalid(field.NewPath("spec").Child("maxMessageRetry"), maxMessageRetry, "MaxMessageRetries and Effectively once are not compatible") + allErrs = append(allErrs, e) + } + + if maxMessageRetry <= 0 && deadLetterTopic != "" { + e := field.Invalid(field.NewPath("spec").Child("maxMessageRetry"), maxMessageRetry, "dead letter topic is set but max message retry is set to infinity") + allErrs = append(allErrs, e) + } + return allErrs +} + +func validateRetainKeyOrdering(retainKeyOrdering bool, processingGuarantee ProcessGuarantee) *field.Error { + if retainKeyOrdering && processingGuarantee == EffectivelyOnce { + return field.Invalid(field.NewPath("spec").Child("retainKeyOrdering"), retainKeyOrdering, "when effectively once processing guarantee is specified, retain Key ordering cannot be set") + } + return nil +} + +func validateRetainOrderingConflicts(retainKeyOrdering bool, retainOrdering bool) []*field.Error { + var allErrs field.ErrorList + if retainKeyOrdering && retainOrdering { + e := field.Invalid(field.NewPath("spec").Child("retainKeyOrdering"), retainKeyOrdering, "only one of retain ordering or retain key ordering can be set") + allErrs = append(allErrs, e) + e = field.Invalid(field.NewPath("spec").Child("retainOrdering"), retainOrdering, "only one of retain ordering or retain key ordering can be set") + allErrs = append(allErrs, e) + } + return allErrs +} + +func validateFunctionConfig(config map[string]string) *field.Error { + if config != nil { + _, err := json.Marshal(config) + if err != nil { + return field.Invalid(field.NewPath("spec").Child("funcConfig"), config, "function config is invalid: "+err.Error()) + } + } + return nil +} + +func validateSinkConfig(config map[string]string) *field.Error { + if config != nil { + _, err := json.Marshal(config) + if err != nil { + return field.Invalid(field.NewPath("spec").Child("sinkConfig"), config, "sink config is invalid: "+err.Error()) + } + } + return nil +} + +func validateSourceConfig(config map[string]string) *field.Error { + if config != nil { + _, err := json.Marshal(config) + if err != nil { + return field.Invalid(field.NewPath("spec").Child("sourceConfig"), config, "source config is invalid: "+err.Error()) + } + } + return nil +} + +func validateSecretsMap(secrets map[string]SecretRef) *field.Error { + if secrets != nil { + _, err := json.Marshal(secrets) + if err != nil { + return field.Invalid(field.NewPath("spec").Child("secretsMap"), secrets, "secrets map is invalid: "+err.Error()) + } + } + return nil +} + +func validateInputOutput(input *InputConf, output *OutputConf) []*field.Error { + var allErrs field.ErrorList + allInputTopics := []string{} + if input != nil { + allInputTopics = collectAllInputTopics(*input) + if len(allInputTopics) == 0 { + e := field.Invalid(field.NewPath("spec").Child("input"), *input, + "No input topic(s) specified for the function") + allErrs = append(allErrs, e) + } + + for _, topic := range allInputTopics { + err := isValidTopicName(topic) + if err != nil { + e := field.Invalid(field.NewPath("spec").Child("input"), *input, + fmt.Sprintf("Input topic %s is invalid", topic)) + allErrs = append(allErrs, e) + } + } + + for topicName, conf := range input.SourceSpecs { + if conf.ReceiverQueueSize != nil && *conf.ReceiverQueueSize < 0 { + e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"), + input.SourceSpecs, fmt.Sprintf("%s receiver queue size should be >= zero", topicName)) + allErrs = append(allErrs, e) + } + + if conf.CryptoConfig != nil && conf.CryptoConfig.CryptoKeyReaderClassName == "" { + e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"), + input.SourceSpecs, fmt.Sprintf("%s cryptoKeyReader class name required", topicName)) + allErrs = append(allErrs, e) + } + } + } + + if output != nil { + if output.Topic != "" { + err := isValidTopicName(output.Topic) + if err != nil { + e := field.Invalid(field.NewPath("spec").Child("output", "topic"), output.Topic, + fmt.Sprintf("Output topic %s is invalid", output.Topic)) + allErrs = append(allErrs, e) + } + for _, v := range allInputTopics { + if v == output.Topic { + e := field.Invalid(field.NewPath("spec").Child("output", "topic"), output.Topic, + fmt.Sprintf("Output topic %s is also being used as an input topic (topics must be one or the other)", + output.Topic)) + allErrs = append(allErrs, e) + } + } + if output.ProducerConf != nil && output.ProducerConf.CryptoConfig != nil { + if output.ProducerConf.CryptoConfig.CryptoKeyReaderClassName == "" { + e := field.Invalid(field.NewPath("spec").Child("output", "producerConf", "cryptoConfig", "cryptoKeyReaderClassName"), + output.ProducerConf.CryptoConfig.CryptoKeyReaderClassName, "cryptoKeyReader class name required") + allErrs = append(allErrs, e) + } + + if len(output.ProducerConf.CryptoConfig.EncryptionKeys) == 0 { + e := field.Invalid(field.NewPath("spec").Child("output", "producerConf", "cryptoConfig", "encryptionKeys"), + output.ProducerConf.CryptoConfig.EncryptionKeys, "must provide encryption key name for crypto key reader") + allErrs = append(allErrs, e) + } + } + } + } + + return allErrs +} + +func validateLogTopic(logTopic string) *field.Error { + if logTopic != "" { + err := isValidTopicName(logTopic) + if err != nil { + return field.Invalid(field.NewPath("spec").Child("logTopic"), logTopic, fmt.Sprintf("Log topic %s is invalid", logTopic)) + } + } + return nil +} + +func validateDeadLetterTopic(deadLetterTopic string) *field.Error { + if deadLetterTopic != "" { + err := isValidTopicName(deadLetterTopic) + if err != nil { + return field.Invalid(field.NewPath("spec").Child("deadLetterTopic"), deadLetterTopic, fmt.Sprintf("DeadLetter topic %s is invalid", deadLetterTopic)) + } + } + return nil +} + +func validateAutoAck(autoAck *bool) *field.Error { + if autoAck == nil { + return field.Invalid(field.NewPath("spec").Child("autoAck"), autoAck, "autoAck cannot be nil") + } + return nil +} diff --git a/charts/function-mesh-operator/templates/controller-manager-service.yaml b/charts/function-mesh-operator/templates/controller-manager-service.yaml new file mode 100644 index 000000000..9b06b1257 --- /dev/null +++ b/charts/function-mesh-operator/templates/controller-manager-service.yaml @@ -0,0 +1,25 @@ +apiVersion: v1 +kind: Service +metadata: + name: function-mesh-controller-manager-service + labels: + app.kubernetes.io/name: { { template "function-mesh-operator.name" . } } + app.kubernetes.io/managed-by: { { .Release.Service } } + app.kubernetes.io/instance: { { .Release.Name } } + app.kubernetes.io/component: controller-manager + helm.sh/chart: { { .Chart.Name } }-{{ .Chart.Version | replace "+" "_" }} +spec: + type: {{ .Values.controllerManager.service.type }} + ports: + - port: 443 + targetPort: webhook + protocol: TCP + name: webhook + - port: 10080 + targetPort: http + protocol: TCP + name: http + selector: + app.kubernetes.io/name: { { template "function-mesh-operator.name" . } } + app.kubernetes.io/instance: { { .Release.Name } } + app.kubernetes.io/component: controller-manager diff --git a/charts/function-mesh-operator/values.yaml b/charts/function-mesh-operator/values.yaml index 729ee4246..49c6b346c 100644 --- a/charts/function-mesh-operator/values.yaml +++ b/charts/function-mesh-operator/values.yaml @@ -46,3 +46,5 @@ controllerManager: selector: [] # - k1==v1 # - k2!=v2 + service: + type: ClusterIP diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index 19efe410c..ee8dba3c1 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -32,7 +32,6 @@ spec: items: properties: autoAck: - default: true type: boolean className: type: string @@ -128,7 +127,6 @@ spec: type: string type: array typeClassName: - default: '[B' type: string type: object java: @@ -212,7 +210,6 @@ spec: topic: type: string typeClassName: - default: '[B' type: string type: object pod: @@ -2468,7 +2465,6 @@ spec: type: string type: object replicas: - default: 1 format: int32 minimum: 1 type: integer @@ -2506,7 +2502,6 @@ spec: - earliest type: string tenant: - default: public type: string timeout: format: int32 @@ -2537,7 +2532,6 @@ spec: items: properties: autoAck: - default: true type: boolean className: type: string @@ -2628,7 +2622,6 @@ spec: type: string type: array typeClassName: - default: '[B' type: string type: object java: @@ -4906,7 +4899,6 @@ spec: type: string type: object replicas: - default: 1 format: int32 minimum: 1 type: integer @@ -4947,7 +4939,6 @@ spec: - earliest type: string tenant: - default: public type: string timeout: format: int32 @@ -5067,7 +5058,6 @@ spec: topic: type: string typeClassName: - default: '[B' type: string type: object pod: @@ -7323,9 +7313,7 @@ spec: type: string type: object replicas: - default: 1 format: int32 - minimum: 1 type: integer resources: properties: @@ -7355,7 +7343,6 @@ spec: sourceType: type: string tenant: - default: public type: string volumeMounts: items: diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 6d1173d63..a16750fce 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -33,7 +33,6 @@ spec: spec: properties: autoAck: - default: true type: boolean className: type: string @@ -129,7 +128,6 @@ spec: type: string type: array typeClassName: - default: '[B' type: string type: object java: @@ -213,7 +211,6 @@ spec: topic: type: string typeClassName: - default: '[B' type: string type: object pod: @@ -2469,7 +2466,6 @@ spec: type: string type: object replicas: - default: 1 format: int32 minimum: 1 type: integer @@ -2507,7 +2503,6 @@ spec: - earliest type: string tenant: - default: public type: string timeout: format: int32 diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index daf4faf32..339d31927 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -33,7 +33,6 @@ spec: spec: properties: autoAck: - default: true type: boolean className: type: string @@ -124,7 +123,6 @@ spec: type: string type: array typeClassName: - default: '[B' type: string type: object java: @@ -2402,7 +2400,6 @@ spec: type: string type: object replicas: - default: 1 format: int32 minimum: 1 type: integer @@ -2443,7 +2440,6 @@ spec: - earliest type: string tenant: - default: public type: string timeout: format: int32 diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index 1b1e59187..af0d9a55c 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -122,7 +122,6 @@ spec: topic: type: string typeClassName: - default: '[B' type: string type: object pod: @@ -2378,9 +2377,7 @@ spec: type: string type: object replicas: - default: 1 format: int32 - minimum: 1 type: integer resources: properties: @@ -2410,7 +2407,6 @@ spec: sourceType: type: string tenant: - default: public type: string volumeMounts: items: diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index e4e7c0e80..c944f5514 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -11,18 +11,18 @@ resources: patchesStrategicMerge: # [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix. # patches here are for enabling the conversion webhook for each CRD -#- patches/webhook_in_functionmeshes.yaml -#- patches/webhook_in_functions.yaml -#- patches/webhook_in_sources.yaml -#- patches/webhook_in_sinks.yaml +- patches/webhook_in_functionmeshes.yaml +- patches/webhook_in_functions.yaml +- patches/webhook_in_sources.yaml +- patches/webhook_in_sinks.yaml # +kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable webhook, uncomment all the sections with [CERTMANAGER] prefix. # patches here are for enabling the CA injection for each CRD -#- patches/cainjection_in_functionmeshes.yaml -#- patches/cainjection_in_functions.yaml -#- patches/cainjection_in_sources.yaml -#- patches/cainjection_in_sinks.yaml +- patches/cainjection_in_functionmeshes.yaml +- patches/cainjection_in_functions.yaml +- patches/cainjection_in_sources.yaml +- patches/cainjection_in_sinks.yaml # +kubebuilder:scaffold:crdkustomizecainjectionpatch # the following config is for teaching kustomize how to do kustomization for CRDs. diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 231f34c60..202679334 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: streamnative/function-mesh - newTag: v0.1.1 + newName: streamnative/function-mesh-operator + newTag: v0.1.6-rc3 diff --git a/hack/webhook-create-signed-cert.sh b/hack/webhook-create-signed-cert.sh new file mode 100755 index 000000000..f7e1018c3 --- /dev/null +++ b/hack/webhook-create-signed-cert.sh @@ -0,0 +1,131 @@ +#!/bin/bash + +set -e + +usage() { + cat <> "${tmpdir}"/csr.conf +[req] +req_extensions = v3_req +distinguished_name = req_distinguished_name +[req_distinguished_name] +[ v3_req ] +basicConstraints = CA:FALSE +keyUsage = nonRepudiation, digitalSignature, keyEncipherment +extendedKeyUsage = serverAuth +subjectAltName = @alt_names +[alt_names] +DNS.1 = ${service} +DNS.2 = ${service}.${namespace} +DNS.3 = ${service}.${namespace}.svc +EOF + +openssl genrsa -out "${tmpdir}"/server-key.pem 2048 +openssl req -new -key "${tmpdir}"/server-key.pem -subj "/CN=${service}.${namespace}.svc" -out "${tmpdir}"/server.csr -config "${tmpdir}"/csr.conf + +# clean-up any previously created CSR for our service. Ignore errors if not present. +kubectl delete csr ${csrName} 2>/dev/null || true + +# create server cert/key CSR and send to k8s API +cat <&2 + exit 1 +fi +echo "${serverCert}" | openssl base64 -d -A -out "${tmpdir}"/server-cert.pem + + +# create the secret with CA cert and server cert/key +kubectl create secret generic ${secret} \ + --from-file=key.pem="${tmpdir}"/server-key.pem \ + --from-file=cert.pem="${tmpdir}"/server-cert.pem \ + --dry-run -o yaml | + kubectl -n ${namespace} apply -f - diff --git a/hack/webhooks/certs/csr.conf b/hack/webhooks/certs/csr.conf new file mode 100644 index 000000000..ef16f8182 --- /dev/null +++ b/hack/webhooks/certs/csr.conf @@ -0,0 +1,13 @@ +[req] +req_extensions = v3_req +distinguished_name = req_distinguished_name +[req_distinguished_name] +[ v3_req ] +basicConstraints = CA:FALSE +keyUsage = nonRepudiation, digitalSignature, keyEncipherment +extendedKeyUsage = serverAuth +subjectAltName = @alt_names +[alt_names] +DNS.1 = webhook-service +DNS.2 = webhook-service.default +DNS.3 = webhook-service.default.svc diff --git a/hack/webhooks/certs/server.csr b/hack/webhooks/certs/server.csr new file mode 100644 index 000000000..34577d190 --- /dev/null +++ b/hack/webhooks/certs/server.csr @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIIC/jCCAeYCAQAwJjEkMCIGA1UEAwwbd2ViaG9vay1zZXJ2aWNlLmRlZmF1bHQu +c3ZjMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA75u8kp1sXvvd80Cy +h1L9qT7ldHf2rLes4GoAWSVSN8EDXjydkk1937tBZNRRWYQSgHC69nW2CKXZvla5 +FxTLzJOvn9FFtVQ02SANRimiicCG3o8B70u1Sx4/O4UhBdlwds7KHo5LoiU84AHU +MJfkGT405yhrJIhk5vbyHYds6UyHrjljvQgh/dEQu1Ug49c5BJfF+g/FR4+MF5C7 +otm8uE3qE5WX2zw1/AIEJAAbNEYumzQ1y/b0JWTHH3QPtozhPL4fCtOAzM3XP/Z/ +wSw7B6LBqgc+wWyIyfijRW9t0TrAMJQV1ZRkgeuNgttIH0kq/4WFFb47TRayGHYN +RtoKXwIDAQABoIGSMIGPBgkqhkiG9w0BCQ4xgYEwfzAJBgNVHRMEAjAAMAsGA1Ud +DwQEAwIF4DATBgNVHSUEDDAKBggrBgEFBQcDATBQBgNVHREESTBHgg93ZWJob29r +LXNlcnZpY2WCF3dlYmhvb2stc2VydmljZS5kZWZhdWx0ght3ZWJob29rLXNlcnZp +Y2UuZGVmYXVsdC5zdmMwDQYJKoZIhvcNAQELBQADggEBAL75H5mnjzLrQmM5tuCz +/MGCvv6j+ieZ0B1Shu/67J3shK7Fo7Hj/j23NkyOC7t/3aNA9JMiHvpcbOqiGkkZ +zGW5Idcjf3TzhJcZT5pMIUGFr9zgf7HUUon+pgKwio7Pg3kBYcVEGgzcHSkyy8mP +AVpKBa46q9eTquZw897nuIc46NUreo/A+LXjoUXmJ7d8iN8jQPaP/YfSI/d2tFe5 +O9DyiGW4lozAMJFdyW1JJuNJmJ9vtnH5aA6YwGsUNzo+E2kzpOy6pdLcyH4mPQGN +ikG6HGuVZ5mV5tDcO3bem+R0WafcNHLZKjYgJEKDgcypS8dTcMn6HRksPIoGlFOv +rZk= +-----END CERTIFICATE REQUEST----- diff --git a/hack/webhooks/certs/tls.crt b/hack/webhooks/certs/tls.crt new file mode 100644 index 000000000..2344c7c55 --- /dev/null +++ b/hack/webhooks/certs/tls.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDTjCCAjagAwIBAgIQXwkFIfJKRdVqLZ2nEHzmRDANBgkqhkiG9w0BAQsFADAV +MRMwEQYDVQQDEwprdWJlcm5ldGVzMB4XDTIxMDcyMzA3MjI0N1oXDTIyMDcyMzA3 +MjI0N1owJjEkMCIGA1UEAxMbd2ViaG9vay1zZXJ2aWNlLmRlZmF1bHQuc3ZjMIIB +IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA75u8kp1sXvvd80Cyh1L9qT7l +dHf2rLes4GoAWSVSN8EDXjydkk1937tBZNRRWYQSgHC69nW2CKXZvla5FxTLzJOv +n9FFtVQ02SANRimiicCG3o8B70u1Sx4/O4UhBdlwds7KHo5LoiU84AHUMJfkGT40 +5yhrJIhk5vbyHYds6UyHrjljvQgh/dEQu1Ug49c5BJfF+g/FR4+MF5C7otm8uE3q +E5WX2zw1/AIEJAAbNEYumzQ1y/b0JWTHH3QPtozhPL4fCtOAzM3XP/Z/wSw7B6LB +qgc+wWyIyfijRW9t0TrAMJQV1ZRkgeuNgttIH0kq/4WFFb47TRayGHYNRtoKXwID +AQABo4GIMIGFMA4GA1UdDwEB/wQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDATAM +BgNVHRMBAf8EAjAAMFAGA1UdEQRJMEeCD3dlYmhvb2stc2VydmljZYIXd2ViaG9v +ay1zZXJ2aWNlLmRlZmF1bHSCG3dlYmhvb2stc2VydmljZS5kZWZhdWx0LnN2YzAN +BgkqhkiG9w0BAQsFAAOCAQEAhz63iGCHgvSsk/s1i3YYBNDjchGxu7z0/SZtGhZT +GrVRwqzXd4WIq2tZsGeOMzS2kVyT8UPeI3n6FXnRkc2XqWag5g+I+KpBsmCqm4XB +uCI+sfNML4HkljTDJuZZtGAZi+hUbrlSbMNIugIqWC54m1pin7H8ELgWEHVCY+d9 +9x13Kj9Naq4Aenw6vdudCy0EFsD42GlSuU3McaqccteI59iC2yN2iEJRkJjRyQEG +01bdxJfIAu0BduTMRGFQmTAYkIQFb+wgBjoG6VlLqOsQjOsYgivpEPFBsK2UCR6P +NqT1LF5vOmQSWCUGrLqVRt5g3TBHLPFB1RWrq/XtpR37sQ== +-----END CERTIFICATE----- diff --git a/hack/webhooks/certs/tls.key b/hack/webhooks/certs/tls.key new file mode 100644 index 000000000..ce092478f --- /dev/null +++ b/hack/webhooks/certs/tls.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEA75u8kp1sXvvd80Cyh1L9qT7ldHf2rLes4GoAWSVSN8EDXjyd +kk1937tBZNRRWYQSgHC69nW2CKXZvla5FxTLzJOvn9FFtVQ02SANRimiicCG3o8B +70u1Sx4/O4UhBdlwds7KHo5LoiU84AHUMJfkGT405yhrJIhk5vbyHYds6UyHrjlj +vQgh/dEQu1Ug49c5BJfF+g/FR4+MF5C7otm8uE3qE5WX2zw1/AIEJAAbNEYumzQ1 +y/b0JWTHH3QPtozhPL4fCtOAzM3XP/Z/wSw7B6LBqgc+wWyIyfijRW9t0TrAMJQV +1ZRkgeuNgttIH0kq/4WFFb47TRayGHYNRtoKXwIDAQABAoIBAHuRb1CVSvw5HB+E +8A0F6bdMrnzAGUC9gLRDNmbC7oG+UoRY8lIvEIk/QtxC+qkViY0flevgjL4s+fxF +MTMMEIYya9IZ1L9M6+xFsVLLyf0MgwFn8vF7s8VHlgiWAY/IsU00G9xiOfMnDRvh +FZLx4jp6cEH+ADNY++B43igIXUx6ocqVw2Sj7JL6WVaHCp86+silhBYJy5yEmZCb +8Fsvimtu1dBCqdkZWXTWuRI1X37YGke2JKhR8JkICHtQnTbldxvvXHkWY+gf1yov +KWkFkXp000Xh+uwVrbE9lUV9EXcdkSe1UCFGo1jILq9HKHi25pK2GdvwZDRLEhn/ +X3fzJ4ECgYEA+34hYZnZR0wB0ddFg6bG7I3rengBbAd14FFLvDEwpk+cFmc76kVV +lZAhm7Pj0qTnjxkeYeClo48yfyZ0NR/mXRm4y2y6NelEbxdvZgciEiAuXNEqwr/J +rugWPHiI5LrCJb8y18e+Z6+jUbj7q6ZAz+bGjiu7iwj0TjQg1P/mPb8CgYEA8+cU +bvLJpqLpSLsgQQpsldEnmdDQ0Rci4X+LHUs2q+Pj+aSe7CrKJISJmxqL6tkGuJiT +/AO/h9t/XK+gGBiuzXJOYuY7IKKFGEdDIXtFTAi4HHnO3AjIJjH3cHzOXat7duEC +DGRIz5tHxJoI4Oanf+lHkoHkBrcekdhqLLlGm2ECgYEA5FhslMLtDphLmq0cV+Hp +AkYFxegKWn0XoToPSuta4daswYawVxDnGWqJqWcXd131Re+nr3Ua0DwnvF087DNv +pZ7CNCoflO2dDkU4B2p4m/6OOgZNDhC3XdEqx3Ml0JBl0YId9uBM2LYX0w7jq8HD +XUGLjPDWUvIpaIxaNyu2cN8CgYAha4RtP29KGgCFKr7cqqT+Uk5u0SILxHPPGj7h +oLC5FNzkFB6Clcpk/pQsYJLTdzFokd+VJJ5V60mkVZxGMPuiwwrimlF3w00haoJU +UCPKq1cu77HcjIeAVFBnZXKRezHLtEEnmYYwjEJlOgYFaxWAfWKFPK8WDHlv2ouR +3UFDgQKBgQD7NKd4WJg9TqcG9x41AUfY9pUTtb1ecRbJngJH4jj0QlzVwLTmpiJ1 +GbZ5J5yf+8rmzLT13uI0KMey55UfhYjtJHqU96kkGZNA8OGfsmQaRajHND633e1B +cb6sK/vbLfZoUbqyeHfeWls66vCjvXmY07Pq2vb9iCKxTVNA+sUaCA== +-----END RSA PRIVATE KEY----- diff --git a/main.go b/main.go index 2788d1c73..f29f3c027 100644 --- a/main.go +++ b/main.go @@ -48,6 +48,7 @@ func init() { func main() { var metricsAddr string var leaderElectionID string + var certDir string var enableLeaderElection bool flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&leaderElectionID, "leader-election-id", "a3f45fce.functionmesh.io", @@ -55,6 +56,8 @@ func main() { flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") + flag.StringVar(&certDir, "cert-dir", "", + "CertDir is the directory that contains the server key and certificate.\n\tif not set, webhook server would look up the server key and certificate in\n\t{TempDir}/k8s-webhook-server/serving-certs. The server key and certificate\n\tmust be named tls.key and tls.crt, respectively.") flag.Parse() ctrl.SetLogger(zap.New(zap.UseDevMode(true))) @@ -65,6 +68,7 @@ func main() { Port: 9443, LeaderElection: enableLeaderElection, LeaderElectionID: leaderElectionID, + CertDir: certDir, }) if err != nil { setupLog.Error(err, "unable to start manager") diff --git a/manifests/crd.yaml b/manifests/crd.yaml index cee7de59a..30a3c1c5a 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -2,10 +2,19 @@ apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) controller-gen.kubebuilder.io/version: v0.2.4 creationTimestamp: null name: functionmeshes.compute.functionmesh.io spec: + conversion: + strategy: Webhook + webhookClientConfig: + caBundle: Cg== + service: + name: webhook-service + namespace: system + path: /convert group: compute.functionmesh.io names: kind: FunctionMesh @@ -30,7 +39,6 @@ spec: items: properties: autoAck: - default: true type: boolean className: type: string @@ -126,7 +134,6 @@ spec: type: string type: array typeClassName: - default: '[B' type: string type: object java: @@ -210,7 +217,6 @@ spec: topic: type: string typeClassName: - default: '[B' type: string type: object pod: @@ -2466,7 +2472,6 @@ spec: type: string type: object replicas: - default: 1 format: int32 minimum: 1 type: integer @@ -2504,7 +2509,6 @@ spec: - earliest type: string tenant: - default: public type: string timeout: format: int32 @@ -2535,7 +2539,6 @@ spec: items: properties: autoAck: - default: true type: boolean className: type: string @@ -2626,7 +2629,6 @@ spec: type: string type: array typeClassName: - default: '[B' type: string type: object java: @@ -4904,7 +4906,6 @@ spec: type: string type: object replicas: - default: 1 format: int32 minimum: 1 type: integer @@ -4945,7 +4946,6 @@ spec: - earliest type: string tenant: - default: public type: string timeout: format: int32 @@ -5065,7 +5065,6 @@ spec: topic: type: string typeClassName: - default: '[B' type: string type: object pod: @@ -7321,9 +7320,7 @@ spec: type: string type: object replicas: - default: 1 format: int32 - minimum: 1 type: integer resources: properties: @@ -7353,7 +7350,6 @@ spec: sourceType: type: string tenant: - default: public type: string volumeMounts: items: @@ -7431,10 +7427,19 @@ apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) controller-gen.kubebuilder.io/version: v0.2.4 creationTimestamp: null name: functions.compute.functionmesh.io spec: + conversion: + strategy: Webhook + webhookClientConfig: + caBundle: Cg== + service: + name: webhook-service + namespace: system + path: /convert group: compute.functionmesh.io names: kind: Function @@ -7460,7 +7465,6 @@ spec: spec: properties: autoAck: - default: true type: boolean className: type: string @@ -7556,7 +7560,6 @@ spec: type: string type: array typeClassName: - default: '[B' type: string type: object java: @@ -7640,7 +7643,6 @@ spec: topic: type: string typeClassName: - default: '[B' type: string type: object pod: @@ -9896,7 +9898,6 @@ spec: type: string type: object replicas: - default: 1 format: int32 minimum: 1 type: integer @@ -9934,7 +9935,6 @@ spec: - earliest type: string tenant: - default: public type: string timeout: format: int32 @@ -10000,10 +10000,19 @@ apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) controller-gen.kubebuilder.io/version: v0.2.4 creationTimestamp: null name: sinks.compute.functionmesh.io spec: + conversion: + strategy: Webhook + webhookClientConfig: + caBundle: Cg== + service: + name: webhook-service + namespace: system + path: /convert group: compute.functionmesh.io names: kind: Sink @@ -10029,7 +10038,6 @@ spec: spec: properties: autoAck: - default: true type: boolean className: type: string @@ -10120,7 +10128,6 @@ spec: type: string type: array typeClassName: - default: '[B' type: string type: object java: @@ -12398,7 +12405,6 @@ spec: type: string type: object replicas: - default: 1 format: int32 minimum: 1 type: integer @@ -12439,7 +12445,6 @@ spec: - earliest type: string tenant: - default: public type: string timeout: format: int32 @@ -12505,10 +12510,19 @@ apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) controller-gen.kubebuilder.io/version: v0.2.4 creationTimestamp: null name: sources.compute.functionmesh.io spec: + conversion: + strategy: Webhook + webhookClientConfig: + caBundle: Cg== + service: + name: webhook-service + namespace: system + path: /convert group: compute.functionmesh.io names: kind: Source @@ -12623,7 +12637,6 @@ spec: topic: type: string typeClassName: - default: '[B' type: string type: object pod: @@ -14879,9 +14892,7 @@ spec: type: string type: object replicas: - default: 1 format: int32 - minimum: 1 type: integer resources: properties: @@ -14911,7 +14922,6 @@ spec: sourceType: type: string tenant: - default: public type: string volumeMounts: items: From e913f3714d93c64e71640138990a4a2f580f43ee Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 23 Jul 2021 17:31:21 +0800 Subject: [PATCH 03/15] fix validation --- .ci/clusters/compute_v1alpha1_function.yaml | 2 +- .ci/clusters/compute_v1alpha1_go_function.yaml | 2 +- .ci/clusters/compute_v1alpha1_py_function.yaml | 2 +- Makefile | 2 +- config/crd/bases/compute.functionmesh.io_functionmeshes.yaml | 1 + config/crd/bases/compute.functionmesh.io_functions.yaml | 1 + config/crd/bases/compute.functionmesh.io_sinks.yaml | 1 + config/crd/bases/compute.functionmesh.io_sources.yaml | 1 + config/samples/compute_v1alpha1_function.yaml | 2 +- config/samples/compute_v1alpha1_function_crypto.yaml | 2 +- .../samples/compute_v1alpha1_function_key_based_batcher.yaml | 2 +- config/samples/compute_v1alpha1_go_function.yaml | 2 +- config/samples/compute_v1alpha1_py_function.yaml | 2 +- images/README.md | 4 ++-- main.go | 2 +- manifests/crd.yaml | 4 ++++ 16 files changed, 20 insertions(+), 12 deletions(-) diff --git a/.ci/clusters/compute_v1alpha1_function.yaml b/.ci/clusters/compute_v1alpha1_function.yaml index 8a7b5441a..56b4576a6 100644 --- a/.ci/clusters/compute_v1alpha1_function.yaml +++ b/.ci/clusters/compute_v1alpha1_function.yaml @@ -7,7 +7,7 @@ spec: image: streamnative/pulsar-functions-java-sample:2.8.0.14 className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/.ci/clusters/compute_v1alpha1_go_function.yaml b/.ci/clusters/compute_v1alpha1_go_function.yaml index 6d6a22a86..864d7c133 100644 --- a/.ci/clusters/compute_v1alpha1_go_function.yaml +++ b/.ci/clusters/compute_v1alpha1_go_function.yaml @@ -6,7 +6,7 @@ metadata: spec: image: streamnative/pulsar-functions-go-sample:2.8.0.14 forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 3 logTopic: persistent://public/default/go-function-logs diff --git a/.ci/clusters/compute_v1alpha1_py_function.yaml b/.ci/clusters/compute_v1alpha1_py_function.yaml index 4ad156b38..347b79ed4 100644 --- a/.ci/clusters/compute_v1alpha1_py_function.yaml +++ b/.ci/clusters/compute_v1alpha1_py_function.yaml @@ -7,7 +7,7 @@ spec: image: streamnative/pulsar-functions-python-sample:2.8.0.14 className: exclamation_function.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 1 logTopic: persistent://public/default/py-function-logs diff --git a/Makefile b/Makefile index c9b6e207f..b1694117f 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ BUNDLE_METADATA_OPTS ?= $(BUNDLE_CHANNELS) $(BUNDLE_DEFAULT_CHANNEL) # Image URL to use all building/pushing image targets IMG ?= ${DOCKER_REPO}/function-mesh-operator:v$(VERSION) # Produce CRDs that work back to Kubernetes 1.11 (no version conversion) -CRD_OPTIONS ?= "crd:maxDescLen=0,trivialVersions=true" +CRD_OPTIONS ?= "crd:maxDescLen=0,trivialVersions=true,preserveUnknownFields=false" # Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set) ifeq (,$(shell go env GOBIN)) diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index ee8dba3c1..aab626cf1 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -14,6 +14,7 @@ spec: listKind: FunctionMeshList plural: functionmeshes singular: functionmesh + preserveUnknownFields: false scope: Namespaced subresources: status: {} diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index a16750fce..71b740140 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -14,6 +14,7 @@ spec: listKind: FunctionList plural: functions singular: function + preserveUnknownFields: false scope: Namespaced subresources: scale: diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index 339d31927..563095745 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -14,6 +14,7 @@ spec: listKind: SinkList plural: sinks singular: sink + preserveUnknownFields: false scope: Namespaced subresources: scale: diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index af0d9a55c..1cb561aed 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -14,6 +14,7 @@ spec: listKind: SourceList plural: sources singular: source + preserveUnknownFields: false scope: Namespaced subresources: scale: diff --git a/config/samples/compute_v1alpha1_function.yaml b/config/samples/compute_v1alpha1_function.yaml index 7e285f69d..9a97d98e8 100644 --- a/config/samples/compute_v1alpha1_function.yaml +++ b/config/samples/compute_v1alpha1_function.yaml @@ -6,7 +6,7 @@ metadata: spec: className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/config/samples/compute_v1alpha1_function_crypto.yaml b/config/samples/compute_v1alpha1_function_crypto.yaml index f78728e7b..9755472f9 100644 --- a/config/samples/compute_v1alpha1_function_crypto.yaml +++ b/config/samples/compute_v1alpha1_function_crypto.yaml @@ -6,7 +6,7 @@ metadata: spec: className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/config/samples/compute_v1alpha1_function_key_based_batcher.yaml b/config/samples/compute_v1alpha1_function_key_based_batcher.yaml index c50fe7d13..ada9e13c0 100644 --- a/config/samples/compute_v1alpha1_function_key_based_batcher.yaml +++ b/config/samples/compute_v1alpha1_function_key_based_batcher.yaml @@ -7,7 +7,7 @@ spec: className: org.apache.pulsar.functions.api.examples.ExclamationFunction image: streamnative/pulsar-all:2.7.1 forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/config/samples/compute_v1alpha1_go_function.yaml b/config/samples/compute_v1alpha1_go_function.yaml index a462d1d5d..f2d6d4d94 100644 --- a/config/samples/compute_v1alpha1_go_function.yaml +++ b/config/samples/compute_v1alpha1_go_function.yaml @@ -5,7 +5,7 @@ metadata: namespace: default spec: forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 1 logTopic: persistent://public/default/go-function-logs diff --git a/config/samples/compute_v1alpha1_py_function.yaml b/config/samples/compute_v1alpha1_py_function.yaml index 45bc12121..da6a14533 100644 --- a/config/samples/compute_v1alpha1_py_function.yaml +++ b/config/samples/compute_v1alpha1_py_function.yaml @@ -6,7 +6,7 @@ metadata: spec: className: exclamation_function.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 1 logTopic: persistent://public/default/py-function-logs diff --git a/images/README.md b/images/README.md index a072ea33d..554b84c38 100644 --- a/images/README.md +++ b/images/README.md @@ -62,7 +62,7 @@ spec: image: streamnative/pulsar-functions-java-runner:2.7.1 # using java function runner className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs @@ -120,7 +120,7 @@ spec: image: streamnative/example-function-image:latest # using function image here className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/main.go b/main.go index f29f3c027..8e98c05a5 100644 --- a/main.go +++ b/main.go @@ -68,7 +68,7 @@ func main() { Port: 9443, LeaderElection: enableLeaderElection, LeaderElectionID: leaderElectionID, - CertDir: certDir, + CertDir: certDir, }) if err != nil { setupLog.Error(err, "unable to start manager") diff --git a/manifests/crd.yaml b/manifests/crd.yaml index 30a3c1c5a..3413c258c 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -21,6 +21,7 @@ spec: listKind: FunctionMeshList plural: functionmeshes singular: functionmesh + preserveUnknownFields: false scope: Namespaced subresources: status: {} @@ -7446,6 +7447,7 @@ spec: listKind: FunctionList plural: functions singular: function + preserveUnknownFields: false scope: Namespaced subresources: scale: @@ -10019,6 +10021,7 @@ spec: listKind: SinkList plural: sinks singular: sink + preserveUnknownFields: false scope: Namespaced subresources: scale: @@ -12529,6 +12532,7 @@ spec: listKind: SourceList plural: sources singular: source + preserveUnknownFields: false scope: Namespaced subresources: scale: From a9681f3001ab1c0e0d1b9254555298712e9c62ba Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 27 Jul 2021 10:43:10 +0800 Subject: [PATCH 04/15] fix CI --- api/v1alpha1/function_webhook.go | 4 ++-- api/v1alpha1/validate.go | 6 ------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/api/v1alpha1/function_webhook.go b/api/v1alpha1/function_webhook.go index 5f9c14e05..2b3152bab 100644 --- a/api/v1alpha1/function_webhook.go +++ b/api/v1alpha1/function_webhook.go @@ -144,12 +144,12 @@ func (r *Function) ValidateCreate() error { } fieldErrs = validateJavaRuntime(r.Spec.Java, r.Spec.ClassName) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } fieldErrs = validatePythonRuntime(r.Spec.Python, r.Spec.ClassName) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } diff --git a/api/v1alpha1/validate.go b/api/v1alpha1/validate.go index 98d3c9561..c98b93932 100644 --- a/api/v1alpha1/validate.go +++ b/api/v1alpha1/validate.go @@ -9,12 +9,6 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" ) -func ensureJavaRuntime(java *JavaRuntime, python *PythonRuntime, golang *GoRuntime) []*field.Error { - var allErrs field.ErrorList - - return allErrs -} - func validateJavaRuntime(java *JavaRuntime, className string) []*field.Error { var allErrs field.ErrorList if java != nil { From 656585ef009f9325d2473f55a1fa57c29c6089f2 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 27 Jul 2021 11:56:11 +0800 Subject: [PATCH 05/15] fix CI --- .../templates/controller-manager-service.yaml | 6 +++--- .../java/io/functionmesh/compute/util/FunctionsUtil.java | 2 +- .../main/java/io/functionmesh/compute/util/SinksUtil.java | 3 ++- .../main/java/io/functionmesh/compute/util/SourcesUtil.java | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/charts/function-mesh-operator/templates/controller-manager-service.yaml b/charts/function-mesh-operator/templates/controller-manager-service.yaml index 9b06b1257..155370ce4 100644 --- a/charts/function-mesh-operator/templates/controller-manager-service.yaml +++ b/charts/function-mesh-operator/templates/controller-manager-service.yaml @@ -3,9 +3,9 @@ kind: Service metadata: name: function-mesh-controller-manager-service labels: - app.kubernetes.io/name: { { template "function-mesh-operator.name" . } } - app.kubernetes.io/managed-by: { { .Release.Service } } - app.kubernetes.io/instance: { { .Release.Name } } + app.kubernetes.io/name: {{ template "function-mesh-operator.name" . }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: controller-manager helm.sh/chart: { { .Chart.Name } }-{{ .Chart.Version | replace "+" "_" }} spec: diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java index 254a011f9..a516ab2d2 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java @@ -351,7 +351,7 @@ public static FunctionConfig createFunctionConfigFromV1alpha1Function(String ten functionConfig.setParallelism(v1alpha1FunctionSpec.getReplicas()); if (v1alpha1FunctionSpec.getProcessingGuarantee() != null) { functionConfig.setProcessingGuarantees( - CommonUtil.convertProcessingGuarantee(v1alpha1FunctionSpec.getProcessingGuarantee())); + CommonUtil.convertProcessingGuarantee(v1alpha1FunctionSpec.getProcessingGuarantee().getValue())); } CustomRuntimeOptions customRuntimeOptions = new CustomRuntimeOptions(); diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java index 2fefd3569..e291e6f99 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java @@ -292,7 +292,8 @@ public static SinkConfig createSinkConfigFromV1alpha1Sink( } sinkConfig.setParallelism(v1alpha1SinkSpec.getReplicas()); if (v1alpha1SinkSpec.getProcessingGuarantee() != null) { - sinkConfig.setProcessingGuarantees(CommonUtil.convertProcessingGuarantee(v1alpha1SinkSpec.getProcessingGuarantee())); + sinkConfig.setProcessingGuarantees( + CommonUtil.convertProcessingGuarantee(v1alpha1SinkSpec.getProcessingGuarantee().getValue())); } CustomRuntimeOptions customRuntimeOptions = new CustomRuntimeOptions(); diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java index 5df92293d..2e8f1c30e 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java @@ -250,7 +250,7 @@ public static SourceConfig createSourceConfigFromV1alpha1Source(String tenant, S sourceConfig.setParallelism(v1alpha1SourceSpec.getReplicas()); if (v1alpha1SourceSpec.getProcessingGuarantee() != null) { sourceConfig.setProcessingGuarantees( - CommonUtil.convertProcessingGuarantee(v1alpha1SourceSpec.getProcessingGuarantee())); + CommonUtil.convertProcessingGuarantee(v1alpha1SourceSpec.getProcessingGuarantee().getValue())); } sourceConfig.setClassName(v1alpha1SourceSpec.getClassName()); From 50d7a725e921ba189934bef2112cff8aefabf480 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 27 Jul 2021 12:09:35 +0800 Subject: [PATCH 06/15] update style --- api/v1alpha1/common.go | 27 +++++++++++++-------------- api/v1alpha1/function_webhook.go | 12 ++++++------ api/v1alpha1/sink_webhook.go | 10 +++++----- api/v1alpha1/source_webhook.go | 8 ++++---- api/v1alpha1/validate.go | 4 +--- 5 files changed, 29 insertions(+), 32 deletions(-) diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index 611bb9dc1..63fb1952c 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -263,7 +263,7 @@ const ( DefaultNamespace string = "default" DefaultCluster string = "kubernetes" - DefaultResourceCpu int64 = 1 + DefaultResourceCPU int64 = 1 DefaultResourceMemory int64 = 1073741824 ) @@ -292,12 +292,11 @@ const ( SourceComponent string = "source" SinkComponent string = "sink" - PackageUrlHttp string = "http://" - PackageUrlHttps string = "https://" - // PackageUrlFile string = "file://" - PackageUrlFunction string = "function://" - PackageUrlSource string = "source://" - PackageUrlSink string = "sink://" + PackageURLHTTP string = "http://" + PackageURLHTTPS string = "https://" + PackageURLFunction string = "function://" + PackageURLSource string = "source://" + PackageURLSink string = "sink://" ) // Config represents untyped YAML configuration. @@ -341,7 +340,7 @@ func validPackageLocation(packageLocation string) error { return err } } else { - if !isFunctionPackageUrlSupported(packageLocation) { + if !isFunctionPackageURLSupported(packageLocation) { return fmt.Errorf("invalid function package url %s, supported url (http/https)", packageLocation) } } @@ -351,9 +350,9 @@ func validPackageLocation(packageLocation string) error { func hasPackageTypePrefix(packageLocation string) bool { lowerCase := strings.ToLower(packageLocation) - return strings.HasPrefix(lowerCase, PackageUrlFunction) || - strings.HasPrefix(lowerCase, PackageUrlSource) || - strings.HasPrefix(lowerCase, PackageUrlSink) + return strings.HasPrefix(lowerCase, PackageURLFunction) || + strings.HasPrefix(lowerCase, PackageURLSource) || + strings.HasPrefix(lowerCase, PackageURLSink) } func isValidPulsarPackageURL(packageLocation string) error { @@ -379,11 +378,11 @@ func isValidPulsarPackageURL(packageLocation string) error { return nil } -func isFunctionPackageUrlSupported(packageLocation string) bool { +func isFunctionPackageURLSupported(packageLocation string) bool { // TODO: support file:// schema lowerCase := strings.ToLower(packageLocation) - return strings.HasPrefix(lowerCase, PackageUrlHttp) || - strings.HasPrefix(lowerCase, PackageUrlHttps) + return strings.HasPrefix(lowerCase, PackageURLHTTP) || + strings.HasPrefix(lowerCase, PackageURLHTTPS) } func collectAllInputTopics(inputs InputConf) []string { diff --git a/api/v1alpha1/function_webhook.go b/api/v1alpha1/function_webhook.go index 2b3152bab..0d1418035 100644 --- a/api/v1alpha1/function_webhook.go +++ b/api/v1alpha1/function_webhook.go @@ -89,7 +89,7 @@ func (r *Function) Default() { if r.Spec.Resources.Requests != nil { if r.Spec.Resources.Requests.Cpu() == nil { - r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCpu) + r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCPU) } if r.Spec.Resources.Requests.Memory() == nil { @@ -154,12 +154,12 @@ func (r *Function) ValidateCreate() error { } fieldErrs = validateGolangRuntime(r.Spec.Golang) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } @@ -179,7 +179,7 @@ func (r *Function) ValidateCreate() error { } fieldErrs = validateMaxMessageRetry(r.Spec.MaxMessageRetry, r.Spec.ProcessingGuarantee, r.Spec.DeadLetterTopic) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } @@ -189,7 +189,7 @@ func (r *Function) ValidateCreate() error { } fieldErrs = validateRetainOrderingConflicts(r.Spec.RetainKeyOrdering, r.Spec.RetainOrdering) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } @@ -204,7 +204,7 @@ func (r *Function) ValidateCreate() error { } fieldErrs = validateInputOutput(&r.Spec.Input, &r.Spec.Output) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } diff --git a/api/v1alpha1/sink_webhook.go b/api/v1alpha1/sink_webhook.go index 178464d95..7b7b488e4 100644 --- a/api/v1alpha1/sink_webhook.go +++ b/api/v1alpha1/sink_webhook.go @@ -79,7 +79,7 @@ func (r *Sink) Default() { if r.Spec.Resources.Requests != nil { if r.Spec.Resources.Requests.Cpu() == nil { - r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCpu) + r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCPU) } if r.Spec.Resources.Requests.Memory() == nil { @@ -117,12 +117,12 @@ func (r *Sink) ValidateCreate() error { } fieldErrs = validateJavaRuntime(r.Spec.Java, r.Spec.ClassName) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } @@ -142,7 +142,7 @@ func (r *Sink) ValidateCreate() error { } fieldErrs = validateMaxMessageRetry(r.Spec.MaxMessageRetry, r.Spec.ProcessingGuarantee, r.Spec.DeadLetterTopic) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } @@ -157,7 +157,7 @@ func (r *Sink) ValidateCreate() error { } fieldErrs = validateInputOutput(&r.Spec.Input, nil) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } diff --git a/api/v1alpha1/source_webhook.go b/api/v1alpha1/source_webhook.go index 4b41bfaec..6ebed1ce8 100644 --- a/api/v1alpha1/source_webhook.go +++ b/api/v1alpha1/source_webhook.go @@ -74,7 +74,7 @@ func (r *Source) Default() { if r.Spec.Resources.Requests != nil { if r.Spec.Resources.Requests.Cpu() == nil { - r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCpu) + r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCPU) } if r.Spec.Resources.Requests.Memory() == nil { @@ -127,12 +127,12 @@ func (r *Source) ValidateCreate() error { } fieldErrs = validateJavaRuntime(r.Spec.Java, r.Spec.ClassName) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } @@ -152,7 +152,7 @@ func (r *Source) ValidateCreate() error { } fieldErrs = validateInputOutput(nil, &r.Spec.Output) - if fieldErrs != nil && len(fieldErrs) > 0 { + if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } diff --git a/api/v1alpha1/validate.go b/api/v1alpha1/validate.go index c98b93932..51d10e8e6 100644 --- a/api/v1alpha1/validate.go +++ b/api/v1alpha1/validate.go @@ -92,9 +92,7 @@ func validateReplicasAndMaxReplicas(replicas, maxReplicas *int32) []*field.Error } func validateResourceRequirement(requirements corev1.ResourceRequirements) *field.Error { - if !(validResource(requirements.Requests) && validResource(requirements.Limits) && - requirements.Requests.Memory().Cmp(*requirements.Limits.Memory()) <= 0 && - requirements.Requests.Cpu().Cmp(*requirements.Limits.Cpu()) <= 0) { + if !validResourceRequirement(requirements) { return field.Invalid(field.NewPath("spec").Child("resources"), requirements, "resource requirement is invalid") } return nil From c295c09d58e83d3246e14219b5ab4a01b4d20a7e Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 27 Jul 2021 12:25:07 +0800 Subject: [PATCH 07/15] goimport --- api/v1alpha1/common.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index 63fb1952c..e04aaba99 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -292,8 +292,8 @@ const ( SourceComponent string = "source" SinkComponent string = "sink" - PackageURLHTTP string = "http://" - PackageURLHTTPS string = "https://" + PackageURLHTTP string = "http://" + PackageURLHTTPS string = "https://" PackageURLFunction string = "function://" PackageURLSource string = "source://" PackageURLSink string = "sink://" From ac225326f832b79c88c80954d70dcecd40042ab9 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 27 Jul 2021 12:43:10 +0800 Subject: [PATCH 08/15] fix CI --- api/v1alpha1/validate.go | 17 +++++++++++++++++ license_test.go | 3 ++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/api/v1alpha1/validate.go b/api/v1alpha1/validate.go index 51d10e8e6..36b92704f 100644 --- a/api/v1alpha1/validate.go +++ b/api/v1alpha1/validate.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package v1alpha1 import ( diff --git a/license_test.go b/license_test.go index 06b9d8241..0f3334ec6 100644 --- a/license_test.go +++ b/license_test.go @@ -70,7 +70,8 @@ var skip = map[string]bool{ ".github/workflows/project.yml": true, ".github/workflows/release.yml": true, ".github/workflows/release-node.yml": true, - ".github/release-drafter.yml": true, + "hack/webhook-create-signed-cert.sh": true, + "hack/webhooks/certs/csr.conf": true, } func TestLicense(t *testing.T) { From 35c38250903c06282ca256ece5f00e39a0bfa1b9 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 27 Jul 2021 13:10:56 +0800 Subject: [PATCH 09/15] update charts and move webhook support into separate PR --- .../templates/controller-manager-service.yaml | 25 ------------------- charts/function-mesh-operator/values.yaml | 2 -- 2 files changed, 27 deletions(-) delete mode 100644 charts/function-mesh-operator/templates/controller-manager-service.yaml diff --git a/charts/function-mesh-operator/templates/controller-manager-service.yaml b/charts/function-mesh-operator/templates/controller-manager-service.yaml deleted file mode 100644 index 155370ce4..000000000 --- a/charts/function-mesh-operator/templates/controller-manager-service.yaml +++ /dev/null @@ -1,25 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: function-mesh-controller-manager-service - labels: - app.kubernetes.io/name: {{ template "function-mesh-operator.name" . }} - app.kubernetes.io/managed-by: {{ .Release.Service }} - app.kubernetes.io/instance: {{ .Release.Name }} - app.kubernetes.io/component: controller-manager - helm.sh/chart: { { .Chart.Name } }-{{ .Chart.Version | replace "+" "_" }} -spec: - type: {{ .Values.controllerManager.service.type }} - ports: - - port: 443 - targetPort: webhook - protocol: TCP - name: webhook - - port: 10080 - targetPort: http - protocol: TCP - name: http - selector: - app.kubernetes.io/name: { { template "function-mesh-operator.name" . } } - app.kubernetes.io/instance: { { .Release.Name } } - app.kubernetes.io/component: controller-manager diff --git a/charts/function-mesh-operator/values.yaml b/charts/function-mesh-operator/values.yaml index 49c6b346c..729ee4246 100644 --- a/charts/function-mesh-operator/values.yaml +++ b/charts/function-mesh-operator/values.yaml @@ -46,5 +46,3 @@ controllerManager: selector: [] # - k1==v1 # - k2!=v2 - service: - type: ClusterIP From 07abb2dc45ac222245a2cda7fde48d5fcd070709 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 27 Jul 2021 14:01:28 +0800 Subject: [PATCH 10/15] code style --- api/v1alpha1/common.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index e04aaba99..2494d8aa2 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -127,6 +127,7 @@ type Runtime struct { Golang *GoRuntime `json:"golang,omitempty"` } +// JavaRuntime contains the java runtime configs // +kubebuilder:validation:Optional type JavaRuntime struct { // +kubebuilder:validation:Required @@ -135,6 +136,7 @@ type JavaRuntime struct { ExtraDependenciesDir string `json:"extraDependenciesDir,omitempty"` } +// PythonRuntime contains the python runtime configs // +kubebuilder:validation:Optional type PythonRuntime struct { // +kubebuilder:validation:Required @@ -142,6 +144,7 @@ type PythonRuntime struct { PyLocation string `json:"pyLocation,omitempty"` } +// GoRuntime contains the golang runtime configs // +kubebuilder:validation:Optional type GoRuntime struct { // +kubebuilder:validation:Required @@ -206,6 +209,7 @@ type CryptoSecret struct { //AsEnv string `json:"asEnv,omitempty"` } +// SubscribePosition enum type // +kubebuilder:validation:Enum=latest;earliest type SubscribePosition string @@ -251,6 +255,7 @@ const ( NoAction ReconcileAction = "NoAction" ) +// ProcessGuarantee enum type // +kubebuilder:validation:Enum=atleast_once;atmost_once;effectively_once type ProcessGuarantee string From fdfe12822b21b3803e69854408a7cf06cc25355c Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 23 Sep 2021 21:34:08 +0800 Subject: [PATCH 11/15] fix CI --- api/v1alpha1/validate.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/api/v1alpha1/validate.go b/api/v1alpha1/validate.go index 36b92704f..1d973a910 100644 --- a/api/v1alpha1/validate.go +++ b/api/v1alpha1/validate.go @@ -154,9 +154,9 @@ func validateRetainOrderingConflicts(retainKeyOrdering bool, retainOrdering bool return allErrs } -func validateFunctionConfig(config map[string]string) *field.Error { +func validateFunctionConfig(config *Config) *field.Error { if config != nil { - _, err := json.Marshal(config) + _, err := config.MarshalJSON() if err != nil { return field.Invalid(field.NewPath("spec").Child("funcConfig"), config, "function config is invalid: "+err.Error()) } @@ -164,9 +164,9 @@ func validateFunctionConfig(config map[string]string) *field.Error { return nil } -func validateSinkConfig(config map[string]string) *field.Error { +func validateSinkConfig(config *Config) *field.Error { if config != nil { - _, err := json.Marshal(config) + _, err := config.MarshalJSON() if err != nil { return field.Invalid(field.NewPath("spec").Child("sinkConfig"), config, "sink config is invalid: "+err.Error()) } @@ -174,9 +174,9 @@ func validateSinkConfig(config map[string]string) *field.Error { return nil } -func validateSourceConfig(config map[string]string) *field.Error { +func validateSourceConfig(config *Config) *field.Error { if config != nil { - _, err := json.Marshal(config) + _, err := config.MarshalJSON() if err != nil { return field.Invalid(field.NewPath("spec").Child("sourceConfig"), config, "source config is invalid: "+err.Error()) } From eab88e9e9e936b383ae65ea20108e44fbe570742 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 23 Sep 2021 21:58:36 +0800 Subject: [PATCH 12/15] fix CI --- api/v1alpha1/function_types.go | 4 +--- api/v1alpha1/sink_types.go | 16 +++++++--------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/api/v1alpha1/function_types.go b/api/v1alpha1/function_types.go index aebba9d24..b21c82ad3 100644 --- a/api/v1alpha1/function_types.go +++ b/api/v1alpha1/function_types.go @@ -54,9 +54,7 @@ type FunctionSpec struct { SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` - Timeout int32 `json:"timeout,omitempty"` - // +kubebuilder:default=true - // +kubebuilder:validation:Required + Timeout int32 `json:"timeout,omitempty"` AutoAck *bool `json:"autoAck,omitempty"` MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"` ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` diff --git a/api/v1alpha1/sink_types.go b/api/v1alpha1/sink_types.go index 7bbda13ce..dd8df6aab 100644 --- a/api/v1alpha1/sink_types.go +++ b/api/v1alpha1/sink_types.go @@ -54,15 +54,13 @@ type SinkSpec struct { SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` - Timeout int32 `json:"timeout,omitempty"` - NegativeAckRedeliveryDelayMs int32 `json:"negativeAckRedeliveryDelayMs,omitempty"` - // +kubebuilder:default=true - // +kubebuilder:validation:Required - AutoAck *bool `json:"autoAck,omitempty"` - MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"` - ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` - RetainOrdering bool `json:"retainOrdering,omitempty"` - DeadLetterTopic string `json:"deadLetterTopic,omitempty"` + Timeout int32 `json:"timeout,omitempty"` + NegativeAckRedeliveryDelayMs int32 `json:"negativeAckRedeliveryDelayMs,omitempty"` + AutoAck *bool `json:"autoAck,omitempty"` + MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"` + ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` + RetainOrdering bool `json:"retainOrdering,omitempty"` + DeadLetterTopic string `json:"deadLetterTopic,omitempty"` RuntimeFlags string `json:"runtimeFlags,omitempty"` SubscriptionName string `json:"subscriptionName,omitempty"` From 7b8ae9f4bb4025ba3f706e8847c1890cd6dc433f Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 24 Sep 2021 08:27:01 +0800 Subject: [PATCH 13/15] fix CI --- .ci/clusters/compute_v1alpha1_function_builtin_hpa.yaml | 2 +- .ci/clusters/compute_v1alpha1_function_hpa.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.ci/clusters/compute_v1alpha1_function_builtin_hpa.yaml b/.ci/clusters/compute_v1alpha1_function_builtin_hpa.yaml index 67cc68b8b..ff337ba35 100644 --- a/.ci/clusters/compute_v1alpha1_function_builtin_hpa.yaml +++ b/.ci/clusters/compute_v1alpha1_function_builtin_hpa.yaml @@ -7,7 +7,7 @@ spec: image: streamnative/pulsar-functions-java-sample:2.8.0.14 className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/.ci/clusters/compute_v1alpha1_function_hpa.yaml b/.ci/clusters/compute_v1alpha1_function_hpa.yaml index 7d1d0bdbb..018ca7ad9 100644 --- a/.ci/clusters/compute_v1alpha1_function_hpa.yaml +++ b/.ci/clusters/compute_v1alpha1_function_hpa.yaml @@ -7,7 +7,7 @@ spec: image: streamnative/pulsar-functions-java-sample:2.8.0.14 className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs From 34ac832852337707ed1c12390a4d9dc4d9161253 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sun, 26 Sep 2021 09:17:38 +0800 Subject: [PATCH 14/15] fix version --- config/manager/kustomization.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 202679334..80fa13418 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -5,4 +5,4 @@ kind: Kustomization images: - name: controller newName: streamnative/function-mesh-operator - newTag: v0.1.6-rc3 + newTag: v0.1.7 From ef660ab5a81a9496f05efcef452a9fe85638cbb1 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sun, 26 Sep 2021 11:03:39 +0800 Subject: [PATCH 15/15] update local test --- api/v1alpha1/function_webhook.go | 4 --- hack/webhook-create-signed-cert.sh | 8 ++--- hack/webhooks/certs/csr.conf | 6 ++-- hack/webhooks/certs/server.csr | 35 +++++++++++---------- hack/webhooks/certs/tls.crt | 35 ++++++++++----------- hack/webhooks/certs/tls.key | 50 +++++++++++++++--------------- 6 files changed, 67 insertions(+), 71 deletions(-) diff --git a/api/v1alpha1/function_webhook.go b/api/v1alpha1/function_webhook.go index 0d1418035..648de623b 100644 --- a/api/v1alpha1/function_webhook.go +++ b/api/v1alpha1/function_webhook.go @@ -126,10 +126,6 @@ func (r *Function) ValidateCreate() error { allErrs = append(allErrs, field.Invalid(field.NewPath("name"), r.Name, "function name is not provided")) } - if r.Spec.FuncConfig == nil { - allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("funcConfig"), r.Spec.FuncConfig, "function config is not provided")) - } - if r.Spec.Runtime.Java == nil && r.Spec.Runtime.Python == nil && r.Spec.Runtime.Golang == nil { allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "java"), r.Spec.Runtime.Java, "runtime cannot be empty")) allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "python"), r.Spec.Runtime.Python, "runtime cannot be empty")) diff --git a/hack/webhook-create-signed-cert.sh b/hack/webhook-create-signed-cert.sh index f7e1018c3..c478eaf67 100755 --- a/hack/webhook-create-signed-cert.sh +++ b/hack/webhook-create-signed-cert.sh @@ -4,10 +4,10 @@ set -e usage() { cat <