Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/clusters/compute_v1alpha1_function.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ spec:
image: streamnative/pulsar-functions-java-sample:2.8.1.3
className: org.apache.pulsar.functions.api.examples.ExclamationFunction
forwardSourceMessageProperty: true
MaxPendingAsyncRequests: 1000
maxPendingAsyncRequests: 1000
replicas: 1
maxReplicas: 5
logTopic: persistent://public/default/logging-function-logs
Expand Down
2 changes: 1 addition & 1 deletion .ci/clusters/compute_v1alpha1_function_builtin_hpa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ spec:
image: streamnative/pulsar-functions-java-sample:2.8.1.3
className: org.apache.pulsar.functions.api.examples.ExclamationFunction
forwardSourceMessageProperty: true
MaxPendingAsyncRequests: 1000
maxPendingAsyncRequests: 1000
replicas: 1
maxReplicas: 5
logTopic: persistent://public/default/logging-function-logs
Expand Down
2 changes: 1 addition & 1 deletion .ci/clusters/compute_v1alpha1_function_hpa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ spec:
image: streamnative/pulsar-functions-java-sample:2.8.1.3
className: org.apache.pulsar.functions.api.examples.ExclamationFunction
forwardSourceMessageProperty: true
MaxPendingAsyncRequests: 1000
maxPendingAsyncRequests: 1000
replicas: 1
maxReplicas: 5
logTopic: persistent://public/default/logging-function-logs
Expand Down
2 changes: 1 addition & 1 deletion .ci/clusters/compute_v1alpha1_go_function.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
spec:
image: streamnative/pulsar-functions-go-sample:2.8.1.3
forwardSourceMessageProperty: true
MaxPendingAsyncRequests: 1000
maxPendingAsyncRequests: 1000
replicas: 1
maxReplicas: 3
logTopic: persistent://public/default/go-function-logs
Expand Down
2 changes: 1 addition & 1 deletion .ci/clusters/compute_v1alpha1_py_function.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ spec:
image: streamnative/pulsar-functions-python-sample:2.8.1.3
className: exclamation_function.ExclamationFunction
forwardSourceMessageProperty: true
MaxPendingAsyncRequests: 1000
maxPendingAsyncRequests: 1000
replicas: 1
maxReplicas: 1
logTopic: persistent://public/default/py-function-logs
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ BUNDLE_METADATA_OPTS ?= $(BUNDLE_CHANNELS) $(BUNDLE_DEFAULT_CHANNEL)
# Image URL to use all building/pushing image targets
IMG ?= ${DOCKER_REPO}/function-mesh-operator:v$(VERSION)
# Produce CRDs that work back to Kubernetes 1.11 (no version conversion)
CRD_OPTIONS ?= "crd:maxDescLen=0,trivialVersions=true"
CRD_OPTIONS ?= "crd:maxDescLen=0,trivialVersions=true,preserveUnknownFields=false"

# Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set)
ifeq (,$(shell go env GOBIN))
Expand Down
110 changes: 110 additions & 0 deletions api/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ package v1alpha1
import (
"encoding/json"

"fmt"
"strings"

autov2beta2 "k8s.io/api/autoscaling/v2beta2"

pctlutil "github.com/streamnative/pulsarctl/pkg/pulsar/utils"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -122,18 +127,27 @@ type Runtime struct {
Golang *GoRuntime `json:"golang,omitempty"`
}

// JavaRuntime contains the java runtime configs
// +kubebuilder:validation:Optional
type JavaRuntime struct {
// +kubebuilder:validation:Required
Jar string `json:"jar,omitempty"`
JarLocation string `json:"jarLocation,omitempty"`
ExtraDependenciesDir string `json:"extraDependenciesDir,omitempty"`
}

// PythonRuntime contains the python runtime configs
// +kubebuilder:validation:Optional
type PythonRuntime struct {
// +kubebuilder:validation:Required
Py string `json:"py,omitempty"`
PyLocation string `json:"pyLocation,omitempty"`
}

// GoRuntime contains the golang runtime configs
// +kubebuilder:validation:Optional
type GoRuntime struct {
// +kubebuilder:validation:Required
Go string `json:"go,omitempty"`
GoLocation string `json:"goLocation,omitempty"`
}
Expand Down Expand Up @@ -195,6 +209,8 @@ type CryptoSecret struct {
//AsEnv string `json:"asEnv,omitempty"`
}

// SubscribePosition enum type
// +kubebuilder:validation:Enum=latest;earliest
type SubscribePosition string

const (
Expand Down Expand Up @@ -239,6 +255,8 @@ const (
NoAction ReconcileAction = "NoAction"
)

// ProcessGuarantee enum type
// +kubebuilder:validation:Enum=atleast_once;atmost_once;effectively_once
type ProcessGuarantee string

const (
Expand All @@ -249,6 +267,9 @@ const (
DefaultTenant string = "public"
DefaultNamespace string = "default"
DefaultCluster string = "kubernetes"

DefaultResourceCPU int64 = 1
DefaultResourceMemory int64 = 1073741824
)

func validResourceRequirement(requirements corev1.ResourceRequirements) bool {
Expand All @@ -275,6 +296,12 @@ const (
FunctionComponent string = "function"
SourceComponent string = "source"
SinkComponent string = "sink"

PackageURLHTTP string = "http://"
PackageURLHTTPS string = "https://"
PackageURLFunction string = "function://"
PackageURLSource string = "source://"
PackageURLSink string = "sink://"
)

// Config represents untyped YAML configuration.
Expand Down Expand Up @@ -310,3 +337,86 @@ func (c *Config) UnmarshalJSON(data []byte) error {
func (c *Config) DeepCopyInto(out *Config) {
out.Data = runtime.DeepCopyJSON(c.Data)
}

func validPackageLocation(packageLocation string) error {
if hasPackageTypePrefix(packageLocation) {
err := isValidPulsarPackageURL(packageLocation)
if err != nil {
return err
}
} else {
if !isFunctionPackageURLSupported(packageLocation) {
return fmt.Errorf("invalid function package url %s, supported url (http/https)", packageLocation)
}
}

return nil
}

func hasPackageTypePrefix(packageLocation string) bool {
lowerCase := strings.ToLower(packageLocation)
return strings.HasPrefix(lowerCase, PackageURLFunction) ||
strings.HasPrefix(lowerCase, PackageURLSource) ||
strings.HasPrefix(lowerCase, PackageURLSink)
}

func isValidPulsarPackageURL(packageLocation string) error {
parts := strings.Split(packageLocation, "://")
if len(parts) != 2 {
return fmt.Errorf("invalid package name %s", packageLocation)
}
if !hasPackageTypePrefix(packageLocation) {
return fmt.Errorf("invalid package name %s", packageLocation)
}
rest := parts[1]
if !strings.Contains(rest, "@") {
rest += "@"
}
packageParts := strings.Split(rest, "@")
if len(packageParts) != 2 {
return fmt.Errorf("invalid package name %s", packageLocation)
}
partsWithoutVersion := strings.Split(packageParts[0], "/")
if len(partsWithoutVersion) != 3 {
return fmt.Errorf("invalid package name %s", packageLocation)
}
return nil
}

func isFunctionPackageURLSupported(packageLocation string) bool {
// TODO: support file:// schema
lowerCase := strings.ToLower(packageLocation)
return strings.HasPrefix(lowerCase, PackageURLHTTP) ||
strings.HasPrefix(lowerCase, PackageURLHTTPS)
}

func collectAllInputTopics(inputs InputConf) []string {
ret := []string{}
if len(inputs.Topics) > 0 {
ret = append(ret, inputs.Topics...)
}
if inputs.TopicPattern != "" {
ret = append(ret, inputs.TopicPattern)
}
if len(inputs.CustomSerdeSources) > 0 {
for k := range inputs.CustomSerdeSources {
ret = append(ret, k)
}
}
if len(inputs.CustomSchemaSources) > 0 {
for k := range inputs.CustomSchemaSources {
ret = append(ret, k)
}
}
if len(inputs.SourceSpecs) > 0 {
for k := range inputs.SourceSpecs {
ret = append(ret, k)
}
}
return ret
}

func isValidTopicName(topicName string) error {
_, err := pctlutil.GetTopicName(topicName)
return err
}
12 changes: 10 additions & 2 deletions api/v1alpha1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@ import (
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// FunctionSpec defines the desired state of Function
// +kubebuilder:validation:Optional
type FunctionSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
// +kubebuilder:validation:Required
Name string `json:"name,omitempty"`
ClassName string `json:"className,omitempty"`
Tenant string `json:"tenant,omitempty"`
Namespace string `json:"namespace,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
Replicas *int32 `json:"replicas,omitempty"`
// +kubebuilder:validation:Required
// +kubebuilder:validation:Minimum=1
Replicas *int32 `json:"replicas,omitempty"`

// MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler
// If provided, a default HPA with CPU at average of 80% will be used.
Expand Down Expand Up @@ -69,8 +73,11 @@ type FunctionSpec struct {

// TODO: windowconfig, customRuntimeOptions?

// +kubebuilder:validation:Required
Messaging `json:",inline"`
Runtime `json:",inline"`

// +kubebuilder:validation:Required
Runtime `json:",inline"`

// Image is the container image used to run function pods.
// default is streamnative/pulsar-functions-java-runner
Expand All @@ -94,6 +101,7 @@ type FunctionStatus struct {
//+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector

// Function is the Schema for the functions API
// +kubebuilder:pruning:PreserveUnknownFields
type Function struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down
Loading