diff --git a/.ci/clusters/compute_v1alpha1_function.yaml b/.ci/clusters/compute_v1alpha1_function.yaml index d99e7f113..2b9d8035e 100644 --- a/.ci/clusters/compute_v1alpha1_function.yaml +++ b/.ci/clusters/compute_v1alpha1_function.yaml @@ -7,7 +7,7 @@ spec: image: streamnative/pulsar-functions-java-sample:2.8.1.3 className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/.ci/clusters/compute_v1alpha1_function_builtin_hpa.yaml b/.ci/clusters/compute_v1alpha1_function_builtin_hpa.yaml index 9dbd86347..61f2b02cb 100644 --- a/.ci/clusters/compute_v1alpha1_function_builtin_hpa.yaml +++ b/.ci/clusters/compute_v1alpha1_function_builtin_hpa.yaml @@ -7,7 +7,7 @@ spec: image: streamnative/pulsar-functions-java-sample:2.8.1.3 className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/.ci/clusters/compute_v1alpha1_function_hpa.yaml b/.ci/clusters/compute_v1alpha1_function_hpa.yaml index 95e8319fc..0568a245c 100644 --- a/.ci/clusters/compute_v1alpha1_function_hpa.yaml +++ b/.ci/clusters/compute_v1alpha1_function_hpa.yaml @@ -7,7 +7,7 @@ spec: image: streamnative/pulsar-functions-java-sample:2.8.1.3 className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/.ci/clusters/compute_v1alpha1_go_function.yaml b/.ci/clusters/compute_v1alpha1_go_function.yaml index 1301f543c..f5beebdca 100644 --- a/.ci/clusters/compute_v1alpha1_go_function.yaml +++ b/.ci/clusters/compute_v1alpha1_go_function.yaml @@ -6,7 +6,7 @@ metadata: spec: image: streamnative/pulsar-functions-go-sample:2.8.1.3 forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 3 logTopic: persistent://public/default/go-function-logs diff --git a/.ci/clusters/compute_v1alpha1_py_function.yaml b/.ci/clusters/compute_v1alpha1_py_function.yaml index 828495ebb..bd572cd45 100644 --- a/.ci/clusters/compute_v1alpha1_py_function.yaml +++ b/.ci/clusters/compute_v1alpha1_py_function.yaml @@ -7,7 +7,7 @@ spec: image: streamnative/pulsar-functions-python-sample:2.8.1.3 className: exclamation_function.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 1 logTopic: persistent://public/default/py-function-logs diff --git a/Makefile b/Makefile index c9b6e207f..b1694117f 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ BUNDLE_METADATA_OPTS ?= $(BUNDLE_CHANNELS) $(BUNDLE_DEFAULT_CHANNEL) # Image URL to use all building/pushing image targets IMG ?= ${DOCKER_REPO}/function-mesh-operator:v$(VERSION) # Produce CRDs that work back to Kubernetes 1.11 (no version conversion) -CRD_OPTIONS ?= "crd:maxDescLen=0,trivialVersions=true" +CRD_OPTIONS ?= "crd:maxDescLen=0,trivialVersions=true,preserveUnknownFields=false" # Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set) ifeq (,$(shell go env GOBIN)) diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index d6431423b..2494d8aa2 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -20,7 +20,12 @@ package v1alpha1 import ( "encoding/json" + "fmt" + "strings" + autov2beta2 "k8s.io/api/autoscaling/v2beta2" + + pctlutil "github.com/streamnative/pulsarctl/pkg/pulsar/utils" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -122,18 +127,27 @@ type Runtime struct { Golang *GoRuntime `json:"golang,omitempty"` } +// JavaRuntime contains the java runtime configs +// +kubebuilder:validation:Optional type JavaRuntime struct { + // +kubebuilder:validation:Required Jar string `json:"jar,omitempty"` JarLocation string `json:"jarLocation,omitempty"` ExtraDependenciesDir string `json:"extraDependenciesDir,omitempty"` } +// PythonRuntime contains the python runtime configs +// +kubebuilder:validation:Optional type PythonRuntime struct { + // +kubebuilder:validation:Required Py string `json:"py,omitempty"` PyLocation string `json:"pyLocation,omitempty"` } +// GoRuntime contains the golang runtime configs +// +kubebuilder:validation:Optional type GoRuntime struct { + // +kubebuilder:validation:Required Go string `json:"go,omitempty"` GoLocation string `json:"goLocation,omitempty"` } @@ -195,6 +209,8 @@ type CryptoSecret struct { //AsEnv string `json:"asEnv,omitempty"` } +// SubscribePosition enum type +// +kubebuilder:validation:Enum=latest;earliest type SubscribePosition string const ( @@ -239,6 +255,8 @@ const ( NoAction ReconcileAction = "NoAction" ) +// ProcessGuarantee enum type +// +kubebuilder:validation:Enum=atleast_once;atmost_once;effectively_once type ProcessGuarantee string const ( @@ -249,6 +267,9 @@ const ( DefaultTenant string = "public" DefaultNamespace string = "default" DefaultCluster string = "kubernetes" + + DefaultResourceCPU int64 = 1 + DefaultResourceMemory int64 = 1073741824 ) func validResourceRequirement(requirements corev1.ResourceRequirements) bool { @@ -275,6 +296,12 @@ const ( FunctionComponent string = "function" SourceComponent string = "source" SinkComponent string = "sink" + + PackageURLHTTP string = "http://" + PackageURLHTTPS string = "https://" + PackageURLFunction string = "function://" + PackageURLSource string = "source://" + PackageURLSink string = "sink://" ) // Config represents untyped YAML configuration. @@ -310,3 +337,86 @@ func (c *Config) UnmarshalJSON(data []byte) error { func (c *Config) DeepCopyInto(out *Config) { out.Data = runtime.DeepCopyJSON(c.Data) } + +func validPackageLocation(packageLocation string) error { + if hasPackageTypePrefix(packageLocation) { + err := isValidPulsarPackageURL(packageLocation) + if err != nil { + return err + } + } else { + if !isFunctionPackageURLSupported(packageLocation) { + return fmt.Errorf("invalid function package url %s, supported url (http/https)", packageLocation) + } + } + + return nil +} + +func hasPackageTypePrefix(packageLocation string) bool { + lowerCase := strings.ToLower(packageLocation) + return strings.HasPrefix(lowerCase, PackageURLFunction) || + strings.HasPrefix(lowerCase, PackageURLSource) || + strings.HasPrefix(lowerCase, PackageURLSink) +} + +func isValidPulsarPackageURL(packageLocation string) error { + parts := strings.Split(packageLocation, "://") + if len(parts) != 2 { + return fmt.Errorf("invalid package name %s", packageLocation) + } + if !hasPackageTypePrefix(packageLocation) { + return fmt.Errorf("invalid package name %s", packageLocation) + } + rest := parts[1] + if !strings.Contains(rest, "@") { + rest += "@" + } + packageParts := strings.Split(rest, "@") + if len(packageParts) != 2 { + return fmt.Errorf("invalid package name %s", packageLocation) + } + partsWithoutVersion := strings.Split(packageParts[0], "/") + if len(partsWithoutVersion) != 3 { + return fmt.Errorf("invalid package name %s", packageLocation) + } + return nil +} + +func isFunctionPackageURLSupported(packageLocation string) bool { + // TODO: support file:// schema + lowerCase := strings.ToLower(packageLocation) + return strings.HasPrefix(lowerCase, PackageURLHTTP) || + strings.HasPrefix(lowerCase, PackageURLHTTPS) +} + +func collectAllInputTopics(inputs InputConf) []string { + ret := []string{} + if len(inputs.Topics) > 0 { + ret = append(ret, inputs.Topics...) + } + if inputs.TopicPattern != "" { + ret = append(ret, inputs.TopicPattern) + } + if len(inputs.CustomSerdeSources) > 0 { + for k := range inputs.CustomSerdeSources { + ret = append(ret, k) + } + } + if len(inputs.CustomSchemaSources) > 0 { + for k := range inputs.CustomSchemaSources { + ret = append(ret, k) + } + } + if len(inputs.SourceSpecs) > 0 { + for k := range inputs.SourceSpecs { + ret = append(ret, k) + } + } + return ret +} + +func isValidTopicName(topicName string) error { + _, err := pctlutil.GetTopicName(topicName) + return err +} diff --git a/api/v1alpha1/function_types.go b/api/v1alpha1/function_types.go index b58fa4bec..74fe4cc1e 100644 --- a/api/v1alpha1/function_types.go +++ b/api/v1alpha1/function_types.go @@ -26,15 +26,19 @@ import ( // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // FunctionSpec defines the desired state of Function +// +kubebuilder:validation:Optional type FunctionSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file + // +kubebuilder:validation:Required Name string `json:"name,omitempty"` ClassName string `json:"className,omitempty"` Tenant string `json:"tenant,omitempty"` Namespace string `json:"namespace,omitempty"` ClusterName string `json:"clusterName,omitempty"` - Replicas *int32 `json:"replicas,omitempty"` + // +kubebuilder:validation:Required + // +kubebuilder:validation:Minimum=1 + Replicas *int32 `json:"replicas,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. @@ -69,8 +73,11 @@ type FunctionSpec struct { // TODO: windowconfig, customRuntimeOptions? + // +kubebuilder:validation:Required Messaging `json:",inline"` - Runtime `json:",inline"` + + // +kubebuilder:validation:Required + Runtime `json:",inline"` // Image is the container image used to run function pods. // default is streamnative/pulsar-functions-java-runner @@ -94,6 +101,7 @@ type FunctionStatus struct { //+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector // Function is the Schema for the functions API +// +kubebuilder:pruning:PreserveUnknownFields type Function struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/api/v1alpha1/function_webhook.go b/api/v1alpha1/function_webhook.go index 6d6eaabe9..648de623b 100644 --- a/api/v1alpha1/function_webhook.go +++ b/api/v1alpha1/function_webhook.go @@ -18,8 +18,9 @@ package v1alpha1 import ( - "encoding/json" - "errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -47,13 +48,13 @@ func (r *Function) Default() { functionlog.Info("default", "name", r.Name) if r.Spec.Replicas == nil { - zeroVal := int32(0) - r.Spec.Replicas = &zeroVal + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = 1 } if r.Spec.AutoAck == nil { - trueVal := true - r.Spec.AutoAck = &trueVal + r.Spec.AutoAck = new(bool) + *r.Spec.AutoAck = true } if r.Spec.ProcessingGuarantee == "" { @@ -88,17 +89,25 @@ func (r *Function) Default() { if r.Spec.Resources.Requests != nil { if r.Spec.Resources.Requests.Cpu() == nil { - r.Spec.Resources.Requests.Cpu().Set(int64(1)) + r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCPU) } if r.Spec.Resources.Requests.Memory() == nil { - r.Spec.Resources.Requests.Memory().Set(int64(1073741824)) + r.Spec.Resources.Requests.Memory().Set(DefaultResourceMemory) } } if r.Spec.Resources.Limits == nil { paddingResourceLimit(&r.Spec.Resources) } + + if r.Spec.Input.TypeClassName == "" { + r.Spec.Input.TypeClassName = "[B" + } + + if r.Spec.Output.TypeClassName == "" { + r.Spec.Output.TypeClassName = "[B" + } } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. @@ -108,69 +117,108 @@ var _ webhook.Validator = &Function{} // ValidateCreate implements webhook.Validator so a webhook will be registered for the type func (r *Function) ValidateCreate() error { - functionlog.Info("validate create", "name", r.Name) + functionlog.Info("validate create function", "name", r.Name) + var allErrs field.ErrorList + var fieldErr *field.Error + var fieldErrs []*field.Error - if r.Spec.Java != nil { - if r.Spec.ClassName == "" { - return errors.New("class name cannot be empty") - } + if r.Name == "" { + allErrs = append(allErrs, field.Invalid(field.NewPath("name"), r.Name, "function name is not provided")) } - // TODO: verify source conf + if r.Spec.Runtime.Java == nil && r.Spec.Runtime.Python == nil && r.Spec.Runtime.Golang == nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "java"), r.Spec.Runtime.Java, "runtime cannot be empty")) + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "python"), r.Spec.Runtime.Python, "runtime cannot be empty")) + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "golang"), r.Spec.Runtime.Golang, "runtime cannot be empty")) + } - // TODO: allow 0 replicas, currently hpa's min value has to be 1 - if *r.Spec.Replicas == 0 { - return errors.New("replicas cannot be zero") + if (r.Spec.Runtime.Java != nil && r.Spec.Runtime.Python != nil) || + (r.Spec.Runtime.Java != nil && r.Spec.Runtime.Golang != nil) || + (r.Spec.Runtime.Python != nil && r.Spec.Runtime.Golang != nil) || + (r.Spec.Runtime.Java != nil && r.Spec.Runtime.Python != nil && r.Spec.Runtime.Golang != nil) { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime"), r.Spec.Runtime, "you can only specify one runtime")) } - if r.Spec.MaxReplicas != nil && *r.Spec.Replicas > *r.Spec.MaxReplicas { - return errors.New("maxReplicas must be greater than or equal to replicas") + fieldErrs = validateJavaRuntime(r.Spec.Java, r.Spec.ClassName) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if !validResourceRequirement(r.Spec.Resources) { - return errors.New("resource requirement is invalid") + fieldErrs = validatePythonRuntime(r.Spec.Python, r.Spec.ClassName) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.Timeout != 0 && r.Spec.ProcessingGuarantee != AtleastOnce { - return errors.New("message timeout can only be set for AtleastOnce processing guarantee") + fieldErrs = validateGolangRuntime(r.Spec.Golang) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.MaxMessageRetry > 0 && r.Spec.ProcessingGuarantee == EffectivelyOnce { - return errors.New("MaxMessageRetries and Effectively once are not compatible") + fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.MaxMessageRetry <= 0 && r.Spec.DeadLetterTopic != "" { - return errors.New("dead letter topic is set but max message retry is set to infinity") + fieldErr = validateResourceRequirement(r.Spec.Resources) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.RetainKeyOrdering && r.Spec.ProcessingGuarantee == EffectivelyOnce { - return errors.New("when effectively once processing guarantee is specified, retain Key ordering cannot be set") + fieldErr = validateAutoAck(r.Spec.AutoAck) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.RetainKeyOrdering && r.Spec.RetainOrdering { - return errors.New("only one of retain ordering or retain key ordering can be set") + fieldErr = validateTimeout(r.Spec.Timeout, r.Spec.ProcessingGuarantee) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.Java == nil && r.Spec.Python == nil && r.Spec.Golang == nil { - return errors.New("must specify a runtime from java, python or golang") + fieldErrs = validateMaxMessageRetry(r.Spec.MaxMessageRetry, r.Spec.ProcessingGuarantee, r.Spec.DeadLetterTopic) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.FuncConfig != nil { - _, err := json.Marshal(r.Spec.FuncConfig) - if err != nil { - return errors.New("provided config is wrong: " + err.Error()) - } + fieldErr = validateRetainKeyOrdering(r.Spec.RetainKeyOrdering, r.Spec.ProcessingGuarantee) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.SecretsMap != nil { - _, err := json.Marshal(r.Spec.SecretsMap) - if err != nil { - return errors.New("provided secrets map is wrong: " + err.Error()) - } + fieldErrs = validateRetainOrderingConflicts(r.Spec.RetainKeyOrdering, r.Spec.RetainOrdering) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - // TODO python/golang specific check - return nil + fieldErr = validateFunctionConfig(r.Spec.FuncConfig) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } + + fieldErr = validateSecretsMap(r.Spec.SecretsMap) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } + + fieldErrs = validateInputOutput(&r.Spec.Input, &r.Spec.Output) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) + } + + fieldErr = validateLogTopic(r.Spec.LogTopic) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } + + fieldErr = validateDeadLetterTopic(r.Spec.DeadLetterTopic) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } + + if len(allErrs) == 0 { + return nil + } + + return apierrors.NewInvalid(schema.GroupKind{Group: "compute.functionmesh.io", Kind: "Function"}, r.Name, allErrs) } // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type diff --git a/api/v1alpha1/sink_types.go b/api/v1alpha1/sink_types.go index 9efde8873..f88ee6b16 100644 --- a/api/v1alpha1/sink_types.go +++ b/api/v1alpha1/sink_types.go @@ -26,16 +26,20 @@ import ( // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // SinkSpec defines the desired state of Topic +// +kubebuilder:validation:Optional type SinkSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file + // +kubebuilder:validation:Required Name string `json:"name,omitempty"` ClassName string `json:"className,omitempty"` ClusterName string `json:"clusterName,omitempty"` Tenant string `json:"tenant,omitempty"` Namespace string `json:"namespace,omitempty"` SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector - Replicas *int32 `json:"replicas,omitempty"` + // +kubebuilder:validation:Required + // +kubebuilder:validation:Minimum=1 + Replicas *int32 `json:"replicas,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. @@ -65,8 +69,10 @@ type SinkSpec struct { Pod PodPolicy `json:"pod,omitempty"` + // +kubebuilder:validation:Required Messaging `json:",inline"` - Runtime `json:",inline"` + // +kubebuilder:validation:Required + Runtime `json:",inline"` // Image is the container image used to run sink pods. // default is streamnative/pulsar-functions-java-runner @@ -89,7 +95,8 @@ type SinkStatus struct { // +kubebuilder:subresource:status //+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector -// Topic is the Schema for the sinks API +// Sink is the Schema for the sinks API +// +kubebuilder:pruning:PreserveUnknownFields type Sink struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/api/v1alpha1/sink_webhook.go b/api/v1alpha1/sink_webhook.go index 94e6c4374..7b7b488e4 100644 --- a/api/v1alpha1/sink_webhook.go +++ b/api/v1alpha1/sink_webhook.go @@ -18,8 +18,9 @@ package v1alpha1 import ( - "encoding/json" - "errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -47,13 +48,13 @@ func (r *Sink) Default() { sinklog.Info("default", "name", r.Name) if r.Spec.Replicas == nil { - zeroVal := int32(0) - r.Spec.Replicas = &zeroVal + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = 1 } if r.Spec.AutoAck == nil { - trueVal := true - r.Spec.AutoAck = &trueVal + r.Spec.AutoAck = new(bool) + *r.Spec.AutoAck = true } if r.Spec.ProcessingGuarantee == "" { @@ -78,17 +79,21 @@ func (r *Sink) Default() { if r.Spec.Resources.Requests != nil { if r.Spec.Resources.Requests.Cpu() == nil { - r.Spec.Resources.Requests.Cpu().Set(int64(1)) + r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCPU) } if r.Spec.Resources.Requests.Memory() == nil { - r.Spec.Resources.Requests.Memory().Set(int64(1073741824)) + r.Spec.Resources.Requests.Memory().Set(DefaultResourceMemory) } } if r.Spec.Resources.Limits == nil { paddingResourceLimit(&r.Spec.Resources) } + + if r.Spec.Input.TypeClassName == "" { + r.Spec.Input.TypeClassName = "[B" + } } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. @@ -98,62 +103,74 @@ var _ webhook.Validator = &Sink{} // ValidateCreate implements webhook.Validator so a webhook will be registered for the type func (r *Sink) ValidateCreate() error { - sinklog.Info("validate create", "name", r.Name) + sinklog.Info("validate create sink", "name", r.Name) + var allErrs field.ErrorList + var fieldErr *field.Error + var fieldErrs []*field.Error - if r.Spec.Java != nil { - if r.Spec.ClassName == "" { - return errors.New("class name cannot be empty") - } + if r.Spec.SinkConfig == nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("sinkConfig"), r.Spec.SinkConfig, "sink config is not provided")) } - // TODO: verify inputConf + if r.Spec.Runtime.Java == nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "java"), r.Spec.Runtime.Java, "sink must have java runtime specified")) + } - // TODO: allow 0 replicas, currently hpa's min value has to be 1 - if *r.Spec.Replicas == 0 { - return errors.New("replicas cannot be zero") + fieldErrs = validateJavaRuntime(r.Spec.Java, r.Spec.ClassName) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.MaxReplicas != nil && *r.Spec.Replicas > *r.Spec.MaxReplicas { - return errors.New("maxReplicas must be greater than or equal to replicas") + fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if !validResourceRequirement(r.Spec.Resources) { - return errors.New("resource requirement is invalid") + fieldErr = validateResourceRequirement(r.Spec.Resources) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.Timeout != 0 && r.Spec.ProcessingGuarantee != AtleastOnce { - return errors.New("message timeout can only be set for AtleastOnce processing guarantee") + fieldErr = validateAutoAck(r.Spec.AutoAck) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.MaxMessageRetry > 0 && r.Spec.ProcessingGuarantee == EffectivelyOnce { - return errors.New("MaxMessageRetries and Effectively once are not compatible") + fieldErr = validateTimeout(r.Spec.Timeout, r.Spec.ProcessingGuarantee) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.MaxMessageRetry <= 0 && r.Spec.DeadLetterTopic != "" { - return errors.New("dead letter topic is set but max message retry is set to infinity") + fieldErrs = validateMaxMessageRetry(r.Spec.MaxMessageRetry, r.Spec.ProcessingGuarantee, r.Spec.DeadLetterTopic) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.Java == nil && r.Spec.Python == nil && r.Spec.Golang == nil { - return errors.New("must specify a runtime from java, python or golang") + fieldErr = validateSinkConfig(r.Spec.SinkConfig) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.SinkConfig != nil { - _, err := json.Marshal(r.Spec.SinkConfig) - if err != nil { - return errors.New("provided config is wrong: " + err.Error()) - } + fieldErr = validateSecretsMap(r.Spec.SecretsMap) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.SecretsMap != nil { - _, err := json.Marshal(r.Spec.SecretsMap) - if err != nil { - return errors.New("provided secrets map is wrong: " + err.Error()) - } + fieldErrs = validateInputOutput(&r.Spec.Input, nil) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - // TODO python/golang specific check + fieldErr = validateDeadLetterTopic(r.Spec.DeadLetterTopic) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } - return nil + if len(allErrs) == 0 { + return nil + } + + return apierrors.NewInvalid(schema.GroupKind{Group: "compute.functionmesh.io", Kind: "Sink"}, r.Name, allErrs) } // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type diff --git a/api/v1alpha1/source_types.go b/api/v1alpha1/source_types.go index 1d22bd853..09dd51d35 100644 --- a/api/v1alpha1/source_types.go +++ b/api/v1alpha1/source_types.go @@ -26,16 +26,19 @@ import ( // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // SourceSpec defines the desired state of Source +// +kubebuilder:validation:Optional type SourceSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file + // +kubebuilder:validation:Required Name string `json:"name,omitempty"` ClassName string `json:"className,omitempty"` Tenant string `json:"tenant,omitempty"` Namespace string `json:"namespace,omitempty"` ClusterName string `json:"clusterName,omitempty"` SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector - Replicas *int32 `json:"replicas,omitempty"` + // +kubebuilder:validation:Required + Replicas *int32 `json:"replicas,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. @@ -54,8 +57,11 @@ type SourceSpec struct { ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"` Pod PodPolicy `json:"pod,omitempty"` + // +kubebuilder:validation:Required Messaging `json:",inline"` - Runtime `json:",inline"` + + // +kubebuilder:validation:Required + Runtime `json:",inline"` // Image is the container image used to run source pods. // default is streamnative/pulsar-functions-java-runner @@ -79,6 +85,7 @@ type SourceStatus struct { //+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector // Source is the Schema for the sources API +// +kubebuilder:pruning:PreserveUnknownFields type Source struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/api/v1alpha1/source_webhook.go b/api/v1alpha1/source_webhook.go index a9ce5934e..6ebed1ce8 100644 --- a/api/v1alpha1/source_webhook.go +++ b/api/v1alpha1/source_webhook.go @@ -18,8 +18,9 @@ package v1alpha1 import ( - "encoding/json" - "errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -47,15 +48,10 @@ func (r *Source) Default() { sourcelog.Info("default", "name", r.Name) if r.Spec.Replicas == nil { - zeroVal := int32(0) - r.Spec.Replicas = &zeroVal + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = 1 } - //if r.Spec.AutoAck == nil { - // trueVal := true - // r.Spec.AutoAck = &trueVal - //} - if r.Spec.ProcessingGuarantee == "" { r.Spec.ProcessingGuarantee = AtleastOnce } @@ -78,11 +74,11 @@ func (r *Source) Default() { if r.Spec.Resources.Requests != nil { if r.Spec.Resources.Requests.Cpu() == nil { - r.Spec.Resources.Requests.Cpu().Set(int64(1)) + r.Spec.Resources.Requests.Cpu().Set(DefaultResourceCPU) } if r.Spec.Resources.Requests.Memory() == nil { - r.Spec.Resources.Requests.Memory().Set(int64(1073741824)) + r.Spec.Resources.Requests.Memory().Set(DefaultResourceMemory) } } @@ -104,6 +100,10 @@ func (r *Source) Default() { if r.Spec.Resources.Limits == nil { paddingResourceLimit(&r.Spec.Resources) } + + if r.Spec.Output.TypeClassName == "" { + r.Spec.Output.TypeClassName = "[B" + } } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. @@ -113,49 +113,54 @@ var _ webhook.Validator = &Source{} // ValidateCreate implements webhook.Validator so a webhook will be registered for the type func (r *Source) ValidateCreate() error { - sourcelog.Info("validate create", "name", r.Name) + sourcelog.Info("validate create source", "name", r.Name) + var allErrs field.ErrorList + var fieldErr *field.Error + var fieldErrs []*field.Error - if r.Spec.Java != nil { - if r.Spec.ClassName == "" { - return errors.New("class name cannot be empty") - } + if r.Spec.SourceConfig == nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("sourceConfig"), r.Spec.SourceConfig, "source config is not provided")) + } + + if r.Spec.Runtime.Java == nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime", "java"), r.Spec.Runtime.Java, "source must have java runtime specified")) } - // TODO: verify inputConf + fieldErrs = validateJavaRuntime(r.Spec.Java, r.Spec.ClassName) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) + } - // TODO: allow 0 replicas, currently hpa's min value has to be 1 - if *r.Spec.Replicas == 0 { - return errors.New("replicas cannot be zero") + fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.MaxReplicas != nil && *r.Spec.Replicas > *r.Spec.MaxReplicas { - return errors.New("maxReplicas must be greater than or equal to replicas") + fieldErr = validateResourceRequirement(r.Spec.Resources) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if !validResourceRequirement(r.Spec.Resources) { - return errors.New("resource requirement is invalid") + fieldErr = validateSourceConfig(r.Spec.SourceConfig) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.Java == nil && r.Spec.Python == nil && r.Spec.Golang == nil { - return errors.New("must specify a runtime from java, python or golang") + fieldErr = validateSecretsMap(r.Spec.SecretsMap) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) } - if r.Spec.SourceConfig != nil { - _, err := json.Marshal(r.Spec.SourceConfig) - if err != nil { - return errors.New("provided config is wrong: " + err.Error()) - } + fieldErrs = validateInputOutput(nil, &r.Spec.Output) + if len(fieldErrs) > 0 { + allErrs = append(allErrs, fieldErrs...) } - if r.Spec.SecretsMap != nil { - _, err := json.Marshal(r.Spec.SecretsMap) - if err != nil { - return errors.New("provided secrets map is wrong: " + err.Error()) - } + if len(allErrs) == 0 { + return nil } - // TODO python/golang specific check - return nil + return apierrors.NewInvalid(schema.GroupKind{Group: "compute.functionmesh.io", Kind: "Source"}, r.Name, allErrs) } // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type diff --git a/api/v1alpha1/validate.go b/api/v1alpha1/validate.go new file mode 100644 index 000000000..1d973a910 --- /dev/null +++ b/api/v1alpha1/validate.go @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v1alpha1 + +import ( + "encoding/json" + "fmt" + + corev1 "k8s.io/api/core/v1" + + "k8s.io/apimachinery/pkg/util/validation/field" +) + +func validateJavaRuntime(java *JavaRuntime, className string) []*field.Error { + var allErrs field.ErrorList + if java != nil { + if className == "" { + e := field.Invalid(field.NewPath("spec").Child("classname"), className, "class name cannot be empty") + allErrs = append(allErrs, e) + } + if java.Jar == "" { + e := field.Invalid(field.NewPath("spec").Child("java", "jar"), java.Jar, "jar cannot be empty in java runtime") + allErrs = append(allErrs, e) + } + if java.JarLocation != "" { + err := validPackageLocation(java.JarLocation) + if err != nil { + e := field.Invalid(field.NewPath("spec").Child("java", "jarLocation"), java.JarLocation, err.Error()) + allErrs = append(allErrs, e) + } + } + } + return allErrs +} + +func validatePythonRuntime(python *PythonRuntime, className string) []*field.Error { + var allErrs field.ErrorList + if python != nil { + if className == "" { + e := field.Invalid(field.NewPath("spec").Child("classname"), className, "class name cannot be empty") + allErrs = append(allErrs, e) + } + if python.Py == "" { + e := field.Invalid(field.NewPath("spec").Child("python", "py"), python.Py, "py cannot be empty in python runtime") + allErrs = append(allErrs, e) + } + if python.PyLocation != "" { + err := validPackageLocation(python.PyLocation) + if err != nil { + e := field.Invalid(field.NewPath("spec").Child("python", "pyLocation"), python.PyLocation, err.Error()) + allErrs = append(allErrs, e) + } + } + } + return allErrs +} + +func validateGolangRuntime(golang *GoRuntime) []*field.Error { + var allErrs field.ErrorList + if golang != nil { + if golang.Go == "" { + e := field.Invalid(field.NewPath("spec").Child("golang", "go"), golang.Go, "go cannot be empty in golang runtime") + allErrs = append(allErrs, e) + } + if golang.GoLocation != "" { + err := validPackageLocation(golang.GoLocation) + if err != nil { + e := field.Invalid(field.NewPath("spec").Child("golang", "goLocation"), golang.GoLocation, err.Error()) + allErrs = append(allErrs, e) + } + } + } + return allErrs +} + +func validateReplicasAndMaxReplicas(replicas, maxReplicas *int32) []*field.Error { + var allErrs field.ErrorList + // TODO: allow 0 replicas, currently hpa's min value has to be 1 + if replicas == nil { + e := field.Invalid(field.NewPath("spec").Child("replicas"), nil, "replicas cannot be nil") + allErrs = append(allErrs, e) + } + + if replicas != nil && *replicas <= 0 { + e := field.Invalid(field.NewPath("spec").Child("replicas"), *replicas, "replicas cannot be zero or negative") + allErrs = append(allErrs, e) + } + + if maxReplicas != nil && replicas != nil && *replicas > *maxReplicas { + e := field.Invalid(field.NewPath("spec").Child("maxReplicas"), *maxReplicas, "maxReplicas must be greater than or equal to replicas") + allErrs = append(allErrs, e) + } + return allErrs +} + +func validateResourceRequirement(requirements corev1.ResourceRequirements) *field.Error { + if !validResourceRequirement(requirements) { + return field.Invalid(field.NewPath("spec").Child("resources"), requirements, "resource requirement is invalid") + } + return nil +} + +func validateTimeout(timeout int32, processingGuarantee ProcessGuarantee) *field.Error { + if timeout != 0 && processingGuarantee == EffectivelyOnce { + return field.Invalid(field.NewPath("spec").Child("timeout"), timeout, "message timeout can only be set for AtleastOnce processing guarantee") + } + return nil +} + +func validateMaxMessageRetry(maxMessageRetry int32, processingGuarantee ProcessGuarantee, deadLetterTopic string) []*field.Error { + var allErrs field.ErrorList + if maxMessageRetry > 0 && processingGuarantee == EffectivelyOnce { + e := field.Invalid(field.NewPath("spec").Child("maxMessageRetry"), maxMessageRetry, "MaxMessageRetries and Effectively once are not compatible") + allErrs = append(allErrs, e) + } + + if maxMessageRetry <= 0 && deadLetterTopic != "" { + e := field.Invalid(field.NewPath("spec").Child("maxMessageRetry"), maxMessageRetry, "dead letter topic is set but max message retry is set to infinity") + allErrs = append(allErrs, e) + } + return allErrs +} + +func validateRetainKeyOrdering(retainKeyOrdering bool, processingGuarantee ProcessGuarantee) *field.Error { + if retainKeyOrdering && processingGuarantee == EffectivelyOnce { + return field.Invalid(field.NewPath("spec").Child("retainKeyOrdering"), retainKeyOrdering, "when effectively once processing guarantee is specified, retain Key ordering cannot be set") + } + return nil +} + +func validateRetainOrderingConflicts(retainKeyOrdering bool, retainOrdering bool) []*field.Error { + var allErrs field.ErrorList + if retainKeyOrdering && retainOrdering { + e := field.Invalid(field.NewPath("spec").Child("retainKeyOrdering"), retainKeyOrdering, "only one of retain ordering or retain key ordering can be set") + allErrs = append(allErrs, e) + e = field.Invalid(field.NewPath("spec").Child("retainOrdering"), retainOrdering, "only one of retain ordering or retain key ordering can be set") + allErrs = append(allErrs, e) + } + return allErrs +} + +func validateFunctionConfig(config *Config) *field.Error { + if config != nil { + _, err := config.MarshalJSON() + if err != nil { + return field.Invalid(field.NewPath("spec").Child("funcConfig"), config, "function config is invalid: "+err.Error()) + } + } + return nil +} + +func validateSinkConfig(config *Config) *field.Error { + if config != nil { + _, err := config.MarshalJSON() + if err != nil { + return field.Invalid(field.NewPath("spec").Child("sinkConfig"), config, "sink config is invalid: "+err.Error()) + } + } + return nil +} + +func validateSourceConfig(config *Config) *field.Error { + if config != nil { + _, err := config.MarshalJSON() + if err != nil { + return field.Invalid(field.NewPath("spec").Child("sourceConfig"), config, "source config is invalid: "+err.Error()) + } + } + return nil +} + +func validateSecretsMap(secrets map[string]SecretRef) *field.Error { + if secrets != nil { + _, err := json.Marshal(secrets) + if err != nil { + return field.Invalid(field.NewPath("spec").Child("secretsMap"), secrets, "secrets map is invalid: "+err.Error()) + } + } + return nil +} + +func validateInputOutput(input *InputConf, output *OutputConf) []*field.Error { + var allErrs field.ErrorList + allInputTopics := []string{} + if input != nil { + allInputTopics = collectAllInputTopics(*input) + if len(allInputTopics) == 0 { + e := field.Invalid(field.NewPath("spec").Child("input"), *input, + "No input topic(s) specified for the function") + allErrs = append(allErrs, e) + } + + for _, topic := range allInputTopics { + err := isValidTopicName(topic) + if err != nil { + e := field.Invalid(field.NewPath("spec").Child("input"), *input, + fmt.Sprintf("Input topic %s is invalid", topic)) + allErrs = append(allErrs, e) + } + } + + for topicName, conf := range input.SourceSpecs { + if conf.ReceiverQueueSize != nil && *conf.ReceiverQueueSize < 0 { + e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"), + input.SourceSpecs, fmt.Sprintf("%s receiver queue size should be >= zero", topicName)) + allErrs = append(allErrs, e) + } + + if conf.CryptoConfig != nil && conf.CryptoConfig.CryptoKeyReaderClassName == "" { + e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"), + input.SourceSpecs, fmt.Sprintf("%s cryptoKeyReader class name required", topicName)) + allErrs = append(allErrs, e) + } + } + } + + if output != nil { + if output.Topic != "" { + err := isValidTopicName(output.Topic) + if err != nil { + e := field.Invalid(field.NewPath("spec").Child("output", "topic"), output.Topic, + fmt.Sprintf("Output topic %s is invalid", output.Topic)) + allErrs = append(allErrs, e) + } + for _, v := range allInputTopics { + if v == output.Topic { + e := field.Invalid(field.NewPath("spec").Child("output", "topic"), output.Topic, + fmt.Sprintf("Output topic %s is also being used as an input topic (topics must be one or the other)", + output.Topic)) + allErrs = append(allErrs, e) + } + } + if output.ProducerConf != nil && output.ProducerConf.CryptoConfig != nil { + if output.ProducerConf.CryptoConfig.CryptoKeyReaderClassName == "" { + e := field.Invalid(field.NewPath("spec").Child("output", "producerConf", "cryptoConfig", "cryptoKeyReaderClassName"), + output.ProducerConf.CryptoConfig.CryptoKeyReaderClassName, "cryptoKeyReader class name required") + allErrs = append(allErrs, e) + } + + if len(output.ProducerConf.CryptoConfig.EncryptionKeys) == 0 { + e := field.Invalid(field.NewPath("spec").Child("output", "producerConf", "cryptoConfig", "encryptionKeys"), + output.ProducerConf.CryptoConfig.EncryptionKeys, "must provide encryption key name for crypto key reader") + allErrs = append(allErrs, e) + } + } + } + } + + return allErrs +} + +func validateLogTopic(logTopic string) *field.Error { + if logTopic != "" { + err := isValidTopicName(logTopic) + if err != nil { + return field.Invalid(field.NewPath("spec").Child("logTopic"), logTopic, fmt.Sprintf("Log topic %s is invalid", logTopic)) + } + } + return nil +} + +func validateDeadLetterTopic(deadLetterTopic string) *field.Error { + if deadLetterTopic != "" { + err := isValidTopicName(deadLetterTopic) + if err != nil { + return field.Invalid(field.NewPath("spec").Child("deadLetterTopic"), deadLetterTopic, fmt.Sprintf("DeadLetter topic %s is invalid", deadLetterTopic)) + } + } + return nil +} + +func validateAutoAck(autoAck *bool) *field.Error { + if autoAck == nil { + return field.Invalid(field.NewPath("spec").Child("autoAck"), autoAck, "autoAck cannot be nil") + } + return nil +} diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index 4d54c48e5..aab626cf1 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -14,6 +14,7 @@ spec: listKind: FunctionMeshList plural: functionmeshes singular: functionmesh + preserveUnknownFields: false scope: Namespaced subresources: status: {} @@ -2443,6 +2444,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -2462,6 +2467,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -2492,6 +2498,9 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: type: string @@ -4869,6 +4878,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -4888,6 +4901,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -4921,6 +4935,9 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: type: string @@ -7275,6 +7292,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 72a11da17..71b740140 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -14,6 +14,7 @@ spec: listKind: FunctionList plural: functions singular: function + preserveUnknownFields: false scope: Namespaced subresources: scale: @@ -2444,6 +2445,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -2463,6 +2468,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -2493,6 +2499,9 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: type: string diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index 38edca51d..563095745 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -14,6 +14,7 @@ spec: listKind: SinkList plural: sinks singular: sink + preserveUnknownFields: false scope: Namespaced subresources: scale: @@ -2378,6 +2379,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -2397,6 +2402,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -2430,6 +2436,9 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: type: string diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index 9a9e22c3a..1cb561aed 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -14,6 +14,7 @@ spec: listKind: SourceList plural: sources singular: source + preserveUnknownFields: false scope: Namespaced subresources: scale: @@ -2355,6 +2356,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index e4e7c0e80..c944f5514 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -11,18 +11,18 @@ resources: patchesStrategicMerge: # [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix. # patches here are for enabling the conversion webhook for each CRD -#- patches/webhook_in_functionmeshes.yaml -#- patches/webhook_in_functions.yaml -#- patches/webhook_in_sources.yaml -#- patches/webhook_in_sinks.yaml +- patches/webhook_in_functionmeshes.yaml +- patches/webhook_in_functions.yaml +- patches/webhook_in_sources.yaml +- patches/webhook_in_sinks.yaml # +kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable webhook, uncomment all the sections with [CERTMANAGER] prefix. # patches here are for enabling the CA injection for each CRD -#- patches/cainjection_in_functionmeshes.yaml -#- patches/cainjection_in_functions.yaml -#- patches/cainjection_in_sources.yaml -#- patches/cainjection_in_sinks.yaml +- patches/cainjection_in_functionmeshes.yaml +- patches/cainjection_in_functions.yaml +- patches/cainjection_in_sources.yaml +- patches/cainjection_in_sinks.yaml # +kubebuilder:scaffold:crdkustomizecainjectionpatch # the following config is for teaching kustomize how to do kustomization for CRDs. diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 231f34c60..80fa13418 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: streamnative/function-mesh - newTag: v0.1.1 + newName: streamnative/function-mesh-operator + newTag: v0.1.7 diff --git a/config/samples/compute_v1alpha1_function.yaml b/config/samples/compute_v1alpha1_function.yaml index 7e285f69d..9a97d98e8 100644 --- a/config/samples/compute_v1alpha1_function.yaml +++ b/config/samples/compute_v1alpha1_function.yaml @@ -6,7 +6,7 @@ metadata: spec: className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/config/samples/compute_v1alpha1_function_crypto.yaml b/config/samples/compute_v1alpha1_function_crypto.yaml index f78728e7b..9755472f9 100644 --- a/config/samples/compute_v1alpha1_function_crypto.yaml +++ b/config/samples/compute_v1alpha1_function_crypto.yaml @@ -6,7 +6,7 @@ metadata: spec: className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/config/samples/compute_v1alpha1_function_key_based_batcher.yaml b/config/samples/compute_v1alpha1_function_key_based_batcher.yaml index c50fe7d13..ada9e13c0 100644 --- a/config/samples/compute_v1alpha1_function_key_based_batcher.yaml +++ b/config/samples/compute_v1alpha1_function_key_based_batcher.yaml @@ -7,7 +7,7 @@ spec: className: org.apache.pulsar.functions.api.examples.ExclamationFunction image: streamnative/pulsar-all:2.7.1 forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/config/samples/compute_v1alpha1_go_function.yaml b/config/samples/compute_v1alpha1_go_function.yaml index a462d1d5d..f2d6d4d94 100644 --- a/config/samples/compute_v1alpha1_go_function.yaml +++ b/config/samples/compute_v1alpha1_go_function.yaml @@ -5,7 +5,7 @@ metadata: namespace: default spec: forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 1 logTopic: persistent://public/default/go-function-logs diff --git a/config/samples/compute_v1alpha1_py_function.yaml b/config/samples/compute_v1alpha1_py_function.yaml index 45bc12121..da6a14533 100644 --- a/config/samples/compute_v1alpha1_py_function.yaml +++ b/config/samples/compute_v1alpha1_py_function.yaml @@ -6,7 +6,7 @@ metadata: spec: className: exclamation_function.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 1 logTopic: persistent://public/default/py-function-logs diff --git a/hack/webhook-create-signed-cert.sh b/hack/webhook-create-signed-cert.sh new file mode 100755 index 000000000..c478eaf67 --- /dev/null +++ b/hack/webhook-create-signed-cert.sh @@ -0,0 +1,131 @@ +#!/bin/bash + +set -e + +usage() { + cat <> "${tmpdir}"/csr.conf +[req] +req_extensions = v3_req +distinguished_name = req_distinguished_name +[req_distinguished_name] +[ v3_req ] +basicConstraints = CA:FALSE +keyUsage = nonRepudiation, digitalSignature, keyEncipherment +extendedKeyUsage = serverAuth +subjectAltName = @alt_names +[alt_names] +DNS.1 = ${service} +DNS.2 = ${service}.${namespace} +DNS.3 = ${service}.${namespace}.svc +EOF + +openssl genrsa -out "${tmpdir}"/server-key.pem 2048 +openssl req -new -key "${tmpdir}"/server-key.pem -subj "/CN=${service}.${namespace}.svc" -out "${tmpdir}"/server.csr -config "${tmpdir}"/csr.conf + +# clean-up any previously created CSR for our service. Ignore errors if not present. +kubectl delete csr ${csrName} 2>/dev/null || true + +# create server cert/key CSR and send to k8s API +cat <&2 + exit 1 +fi +echo "${serverCert}" | openssl base64 -d -A -out "${tmpdir}"/server-cert.pem + + +# create the secret with CA cert and server cert/key +kubectl create secret generic ${secret} \ + --from-file=key.pem="${tmpdir}"/server-key.pem \ + --from-file=cert.pem="${tmpdir}"/server-cert.pem \ + --dry-run -o yaml | + kubectl -n ${namespace} apply -f - diff --git a/hack/webhooks/certs/csr.conf b/hack/webhooks/certs/csr.conf new file mode 100644 index 000000000..ccca3d554 --- /dev/null +++ b/hack/webhooks/certs/csr.conf @@ -0,0 +1,13 @@ +[req] +req_extensions = v3_req +distinguished_name = req_distinguished_name +[req_distinguished_name] +[ v3_req ] +basicConstraints = CA:FALSE +keyUsage = nonRepudiation, digitalSignature, keyEncipherment +extendedKeyUsage = serverAuth +subjectAltName = @alt_names +[alt_names] +DNS.1 = function-mesh-webhook-service +DNS.2 = function-mesh-webhook-service.default +DNS.3 = function-mesh-webhook-service.default.svc diff --git a/hack/webhooks/certs/server.csr b/hack/webhooks/certs/server.csr new file mode 100644 index 000000000..7e6190066 --- /dev/null +++ b/hack/webhooks/certs/server.csr @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIIDNzCCAh8CAQAwNDEyMDAGA1UEAwwpZnVuY3Rpb24tbWVzaC13ZWJob29rLXNl +cnZpY2UuZGVmYXVsdC5zdmMwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB +AQDEI3fQ3MR66uojvk5xdscuFPRit+9jeuyEGDiV9ctir/od09JPzMHPY2/G01aT +03emqiWzqHsiNMKb/UMJR7EEDooZ/SjY9CJECX6TPYQTnCWY29IUjj7H1secw2kJ +C8s5QQiEA/ZZAyCWTNrl0QMVxxTkEwSWPffE/iiW32cjOpKGVFM3b83QrPHAloE9 +q7NzyToylRzr/srtQa22LX3rnKhDzojYnBL+glKAXFQunkP2eWvy8sFnLT2p1/qj +LOElHtBG7Fu14f4jVjiUbcUzRhIYz9I5K1exYbuDP8nyPKywY/LoPFrNuOuXRlNp +3GFLU9ZJxhGHHvbdxuJ29xg7AgMBAAGggb0wgboGCSqGSIb3DQEJDjGBrDCBqTAJ +BgNVHRMEAjAAMAsGA1UdDwQEAwIF4DATBgNVHSUEDDAKBggrBgEFBQcDATB6BgNV +HREEczBxgh1mdW5jdGlvbi1tZXNoLXdlYmhvb2stc2VydmljZYIlZnVuY3Rpb24t +bWVzaC13ZWJob29rLXNlcnZpY2UuZGVmYXVsdIIpZnVuY3Rpb24tbWVzaC13ZWJo +b29rLXNlcnZpY2UuZGVmYXVsdC5zdmMwDQYJKoZIhvcNAQELBQADggEBAGDxqszS +FpZTVhstxho4zMi6p3HmdP50vg3rhwVwURjP1jkDFPHBcDzPZj2M1gOJXIY+Kv5Z +jto11cjeL+wL+3pB8o5E2f8hzM/+OlOahZpR+MMEvQBh1CXYl4h7jf5tVQLNrK47 +y4CR7EeXnXAZw4rPHjsxaLkle2zxX3co+PdncqCCAcziGEur4xrTE79x9E5YRSKX +k7yCNuPSiwlm8/FQ4eRbGsXqbsk6QtGedRJa4GsBDLmW/fbH33HmEV/vYZQYXw9T +7FkCGDnCx2+0r1CQ+3cyoq0zmSHwB2gA0sKGiQLTtJjkC/pcnH+aD+bJa90vca83 +etqOavSaW6zM1ps= +-----END CERTIFICATE REQUEST----- diff --git a/hack/webhooks/certs/tls.crt b/hack/webhooks/certs/tls.crt new file mode 100644 index 000000000..bb97c25d4 --- /dev/null +++ b/hack/webhooks/certs/tls.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDHzCCAgegAwIBAgIRALdmRdhMqX+5wWlEXiFywO0wDQYJKoZIhvcNAQELBQAw +ADAeFw0yMTA5MjYwMjA4MzlaFw0yMTEyMjUwMjA4MzlaMAAwggEiMA0GCSqGSIb3 +DQEBAQUAA4IBDwAwggEKAoIBAQCwLGK6jbbZBRCTcNeZayb3T9pBcoVCODEpeXvc +wQBqsOjS6JXLY1nKuGnrYP4n707Narotb7WI039CXw9K/lzjuYdnJxNc4vHIMUbP +Ch20CN+fexoQt8ZsT3wtWRqMKKE9Pbkc8bu/ybR1L4FnYjxmJlh1XSO2WBVc8dik +aj2JClbBBzz+HuTI9LbFQt5YTMmfJPBoMZYHqy02p/Qm6xpj5m9FxiF7UrWqnsIh +XmhJia47tlf5nllFxHGMnNou6fMcKRtOeDg24Tzdcb5GDa1DJ4mOUVSIv52aWwCT +FFQCfJ0cjSKE5enh9MYASZq+2w2Ddv/LYzRLMXrQVTbkMbq/AgMBAAGjgZMwgZAw +DgYDVR0PAQH/BAQDAgWgMAwGA1UdEwEB/wQCMAAwcAYDVR0RAQH/BGYwZIIpZnVu +Y3Rpb24tbWVzaC13ZWJob29rLXNlcnZpY2UuZGVmYXVsdC5zdmOCN2Z1bmN0aW9u +LW1lc2gtd2ViaG9vay1zZXJ2aWNlLmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWww +DQYJKoZIhvcNAQELBQADggEBADufPL3ph7pHjGNtJLsn7kADgMrDNRdR4yfUp7GH +dSqlAtZICXY31DwHCHWjFbLzAUjcvVZuyKKJyZnMc5camwS4py59yLrvdZ0vjZqR +5zEZUpF1YUe/0E4Qp4+AOWCQOm30iNk/QkgHkJGmPKGeO/Ch6kMGZ9/YbLumdjms +Q6aDHTCSYGTTPNOkRx4dVHyjl7sSusxFJbZD3uVQxgK1/P6X0e1inhQUfpynGA/t +oZtWzsjmvPlTbjrf7pboo5fASzNuMcuEWBRH/yDgPtxZtDczccf10yiZq1rJxlVm +ojBAJRlkJnT4DvocBikZzJFgO8B3xZnqc36EAnXyc55e+P8= +-----END CERTIFICATE----- diff --git a/hack/webhooks/certs/tls.key b/hack/webhooks/certs/tls.key new file mode 100644 index 000000000..67758830d --- /dev/null +++ b/hack/webhooks/certs/tls.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAsCxiuo222QUQk3DXmWsm90/aQXKFQjgxKXl73MEAarDo0uiV +y2NZyrhp62D+J+9OzWq6LW+1iNN/Ql8PSv5c47mHZycTXOLxyDFGzwodtAjfn3sa +ELfGbE98LVkajCihPT25HPG7v8m0dS+BZ2I8ZiZYdV0jtlgVXPHYpGo9iQpWwQc8 +/h7kyPS2xULeWEzJnyTwaDGWB6stNqf0JusaY+ZvRcYhe1K1qp7CIV5oSYmuO7ZX ++Z5ZRcRxjJzaLunzHCkbTng4NuE83XG+Rg2tQyeJjlFUiL+dmlsAkxRUAnydHI0i +hOXp4fTGAEmavtsNg3b/y2M0SzF60FU25DG6vwIDAQABAoIBAF5MM8BVMGTzQA8b +XFujLiU8Wb+kU83BFBd2DiV4LQfuj4Csu9k04ZAPq6owrXmkrxJmqSa+33VQd5KI +HUBv7Dp8SQyers+OLgqQzazuZj3CbigIFUiThaLd3MLNKdeRu9Ry4E2s2mjqxyve +dK5ot5mZtsmDRCVK+oxYazR+JVffDHw5/W03IYKe6UIhupcgwL9Jk9b61F4jGFpH +v4FHIJq5SimljCxbf2mBz4YU4LW85J1HIymKk6k1//aGWVU2YkS6QGzJ4O4OWC0t +AmEBQxXf23zC0B1G440WikCFOKJMr6ptmm+XJ5ZXg0SuUs0MPwCuhiUCTFd6U0lI +NdIzKtECgYEA0rM0GFwVxcTqZUfkHoQrsWKwahDAH8Y5UuRmcMScZ/5FrpOGq1KQ +C1GWu0LS3w7SxEe8PoUHyykSYEVZ6NfBorLcTXhS5fVqDyI4VtvXTIRz+bvzDzCG +SR6vXMg9QvNfvmC9HhmVElxqTtOFccLW5xSYN/8AS6EI7u9IEpL4SskCgYEA1gzb +pcfmYNYpxdTGr9NYjTojr5tM+kX5KlE0TGbv2RkvIkabdYhE8LwzIKOAuOMKngK/ +4a46Z3bkSFCMoTpvcx5xSsC9NJkPJ2Z861Zdb2Xfcr4k0OwE7PiYAZc4Hn9wOda1 +2Snk1TvaqCCPW12nP6PoKq4Lzo/hoqEW9OhelUcCgYEAvBHdVEkN3iPwnUrYYizT +21gd4+7jrvj+vfDFR68XdPPQOw3PzSi7Dn9bLfdtxlCy7SSi8KNJ25vjnS8KG/5K +BvHkFvpp9H1kl/GRnGZK/S/VXXrYcBx+INJU8VqC5DncolAzPqTTHTqEYawIlwy1 +SjplzMW++8LX+H55NKsfGZECgYBIFgZ22057uERTKqhLGHsO/NB92GhhNOWH2dcq +D9SgoXmdKng6ac0F4eu6BzZXjabKthRW9a0XoROaQv5JYfucolj3Gtfdedp/o2VY +gDpxRLau1thhP8sc30+Z4Yp1wudRozyUySYWpdSqoGSHGZA0v4d1sC2rwAr2ERhB +HGyWCwKBgQCGyOA69dcCLFuEDmOu3DwYAmXlAmCFyKlMPgvZbczkblzYquIWzjxD +Q84CBIK70zJEa2Dh3DVEH2F4qF6K6toMj7lextlIvtQOm/nuxy84Uktkssssbmll +sxIeMzQJbRVLY2sCucvaFY/YP61Cgm00/XWFJfT9jOy8agumSP0L4Q== +-----END RSA PRIVATE KEY----- diff --git a/images/README.md b/images/README.md index a072ea33d..554b84c38 100644 --- a/images/README.md +++ b/images/README.md @@ -62,7 +62,7 @@ spec: image: streamnative/pulsar-functions-java-runner:2.7.1 # using java function runner className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs @@ -120,7 +120,7 @@ spec: image: streamnative/example-function-image:latest # using function image here className: org.apache.pulsar.functions.api.examples.ExclamationFunction forwardSourceMessageProperty: true - MaxPendingAsyncRequests: 1000 + maxPendingAsyncRequests: 1000 replicas: 1 maxReplicas: 5 logTopic: persistent://public/default/logging-function-logs diff --git a/license_test.go b/license_test.go index 06b9d8241..0f3334ec6 100644 --- a/license_test.go +++ b/license_test.go @@ -70,7 +70,8 @@ var skip = map[string]bool{ ".github/workflows/project.yml": true, ".github/workflows/release.yml": true, ".github/workflows/release-node.yml": true, - ".github/release-drafter.yml": true, + "hack/webhook-create-signed-cert.sh": true, + "hack/webhooks/certs/csr.conf": true, } func TestLicense(t *testing.T) { diff --git a/main.go b/main.go index 2788d1c73..8e98c05a5 100644 --- a/main.go +++ b/main.go @@ -48,6 +48,7 @@ func init() { func main() { var metricsAddr string var leaderElectionID string + var certDir string var enableLeaderElection bool flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&leaderElectionID, "leader-election-id", "a3f45fce.functionmesh.io", @@ -55,6 +56,8 @@ func main() { flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") + flag.StringVar(&certDir, "cert-dir", "", + "CertDir is the directory that contains the server key and certificate.\n\tif not set, webhook server would look up the server key and certificate in\n\t{TempDir}/k8s-webhook-server/serving-certs. The server key and certificate\n\tmust be named tls.key and tls.crt, respectively.") flag.Parse() ctrl.SetLogger(zap.New(zap.UseDevMode(true))) @@ -65,6 +68,7 @@ func main() { Port: 9443, LeaderElection: enableLeaderElection, LeaderElectionID: leaderElectionID, + CertDir: certDir, }) if err != nil { setupLog.Error(err, "unable to start manager") diff --git a/manifests/crd.yaml b/manifests/crd.yaml index 6b73066d9..3413c258c 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -2,16 +2,26 @@ apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) controller-gen.kubebuilder.io/version: v0.2.4 creationTimestamp: null name: functionmeshes.compute.functionmesh.io spec: + conversion: + strategy: Webhook + webhookClientConfig: + caBundle: Cg== + service: + name: webhook-service + namespace: system + path: /convert group: compute.functionmesh.io names: kind: FunctionMesh listKind: FunctionMeshList plural: functionmeshes singular: functionmesh + preserveUnknownFields: false scope: Namespaced subresources: status: {} @@ -2441,6 +2451,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -2460,6 +2474,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -2490,6 +2505,9 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: type: string @@ -4867,6 +4885,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -4886,6 +4908,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -4919,6 +4942,9 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: type: string @@ -7273,6 +7299,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -7398,16 +7428,26 @@ apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) controller-gen.kubebuilder.io/version: v0.2.4 creationTimestamp: null name: functions.compute.functionmesh.io spec: + conversion: + strategy: Webhook + webhookClientConfig: + caBundle: Cg== + service: + name: webhook-service + namespace: system + path: /convert group: compute.functionmesh.io names: kind: Function listKind: FunctionList plural: functions singular: function + preserveUnknownFields: false scope: Namespaced subresources: scale: @@ -9838,6 +9878,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -9857,6 +9901,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -9887,6 +9932,9 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: type: string @@ -9954,16 +10002,26 @@ apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) controller-gen.kubebuilder.io/version: v0.2.4 creationTimestamp: null name: sinks.compute.functionmesh.io spec: + conversion: + strategy: Webhook + webhookClientConfig: + caBundle: Cg== + service: + name: webhook-service + namespace: system + path: /convert group: compute.functionmesh.io names: kind: Sink listKind: SinkList plural: sinks singular: sink + preserveUnknownFields: false scope: Namespaced subresources: scale: @@ -12328,6 +12386,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: @@ -12347,6 +12409,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -12380,6 +12443,9 @@ spec: subscriptionName: type: string subscriptionPosition: + enum: + - latest + - earliest type: string tenant: type: string @@ -12447,16 +12513,26 @@ apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) controller-gen.kubebuilder.io/version: v0.2.4 creationTimestamp: null name: sources.compute.functionmesh.io spec: + conversion: + strategy: Webhook + webhookClientConfig: + caBundle: Cg== + service: + name: webhook-service + namespace: system + path: /convert group: compute.functionmesh.io names: kind: Source listKind: SourceList plural: sources singular: source + preserveUnknownFields: false scope: Namespaced subresources: scale: @@ -14798,6 +14874,10 @@ spec: type: array type: object processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once type: string pulsar: properties: diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java index 7d4616a5c..05211487b 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java @@ -380,7 +380,7 @@ public static FunctionConfig createFunctionConfigFromV1alpha1Function(String ten functionConfig.setParallelism(v1alpha1FunctionSpec.getReplicas()); if (v1alpha1FunctionSpec.getProcessingGuarantee() != null) { functionConfig.setProcessingGuarantees( - CommonUtil.convertProcessingGuarantee(v1alpha1FunctionSpec.getProcessingGuarantee())); + CommonUtil.convertProcessingGuarantee(v1alpha1FunctionSpec.getProcessingGuarantee().getValue())); } CustomRuntimeOptions customRuntimeOptions = new CustomRuntimeOptions(); diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java index f2062b111..dc06fd20f 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java @@ -310,7 +310,8 @@ public static SinkConfig createSinkConfigFromV1alpha1Sink( } sinkConfig.setParallelism(v1alpha1SinkSpec.getReplicas()); if (v1alpha1SinkSpec.getProcessingGuarantee() != null) { - sinkConfig.setProcessingGuarantees(CommonUtil.convertProcessingGuarantee(v1alpha1SinkSpec.getProcessingGuarantee())); + sinkConfig.setProcessingGuarantees( + CommonUtil.convertProcessingGuarantee(v1alpha1SinkSpec.getProcessingGuarantee().getValue())); } CustomRuntimeOptions customRuntimeOptions = new CustomRuntimeOptions(); diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java index 522178a11..79cceaf81 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java @@ -268,7 +268,7 @@ public static SourceConfig createSourceConfigFromV1alpha1Source(String tenant, S sourceConfig.setParallelism(v1alpha1SourceSpec.getReplicas()); if (v1alpha1SourceSpec.getProcessingGuarantee() != null) { sourceConfig.setProcessingGuarantees( - CommonUtil.convertProcessingGuarantee(v1alpha1SourceSpec.getProcessingGuarantee())); + CommonUtil.convertProcessingGuarantee(v1alpha1SourceSpec.getProcessingGuarantee().getValue())); } sourceConfig.setClassName(v1alpha1SourceSpec.getClassName());