Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .ci/ct.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
check-version-increment: false
chart-dirs:
- charts
all: true
all: false
validate-maintainers: false
helm-extra-args: --timeout 2000s
namespace: default
release-label: release
25 changes: 25 additions & 0 deletions .ci/helm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,29 @@ function ci::verify_mesh_worker_service_pulsar_admin() {
RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sinks delete --name data-generator-sink)
echo $RET
${KUBECTL} get pods -n ${NAMESPACE}
echo " === verify mesh worker service with empty connector config"
RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources create --name data-generator-source --source-type data-generator --destination-topic-name persistent://public/default/random-data-topic --custom-runtime-options '{"outputTypeClassName": "org.apache.pulsar.io.datagenerator.Person"}')
echo $RET
if [[ $RET != *"successfully"* ]]; then
return 1
fi
WC=$(${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep "data-generator-source" | wc -l)
while [[ ${WC} -lt 1 ]]; do
echo ${WC};
sleep 20
${KUBECTL} get pods -n ${NAMESPACE}
WC=$(${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep "data-generator-source" | wc -l)
done
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source
RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source)
if [[ $RET != *"true"* ]]; then
${KUBECTL} logs -n ${NAMESPACE} data-generator-source-69865103-source-0
${KUBECTL} get pods data-generator-source-69865103-source-0 -o yaml
return 1
fi
${KUBECTL} get pods -n ${NAMESPACE}
RET=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin sources delete --name data-generator-source)
echo $RET
${KUBECTL} get pods -n ${NAMESPACE}

}
5 changes: 4 additions & 1 deletion .github/workflows/test-helm-charts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ jobs:
if: steps.list-changed.outputs.changed == 'true'

- name: Create kind cluster
uses: helm/kind-action@v1.1.0
uses: helm/kind-action@v1.2.0
if: steps.list-changed.outputs.changed == 'true'
with:
node_image: kindest/node:v1.15.12
wait: 600s

- name: Set up GO 1.13
if: steps.list-changed.outputs.changed == 'true'
Expand Down
37 changes: 37 additions & 0 deletions api/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package v1alpha1

import (
"encoding/json"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

type Messaging struct {
Expand Down Expand Up @@ -251,3 +254,37 @@ const (
SourceComponent string = "source"
SinkComponent string = "sink"
)

// Config represents untyped YAML configuration.
type Config struct {
// Data holds the configuration keys and values.
// This field exists to work around https://github.com/kubernetes-sigs/kubebuilder/issues/528
Data map[string]interface{} `json:"-"`
}

// NewConfig constructs a Config with the given unstructured configuration data.
func NewConfig(cfg map[string]interface{}) Config {
return Config{Data: cfg}
}

// MarshalJSON implements the Marshaler interface.
func (c *Config) MarshalJSON() ([]byte, error) {
return json.Marshal(c.Data)
}

// UnmarshalJSON implements the Unmarshaler interface.
func (c *Config) UnmarshalJSON(data []byte) error {
var out map[string]interface{}
err := json.Unmarshal(data, &out)
if err != nil {
return err
}
c.Data = out
return nil
}

// DeepCopyInto is an ~autogenerated~ deepcopy function, copying the receiver, writing into out. in must be non-nil.
// This exists here to work around https://github.com/kubernetes/code-generator/issues/50
func (c *Config) DeepCopyInto(out *Config) {
out.Data = runtime.DeepCopyJSON(c.Data)
}
22 changes: 12 additions & 10 deletions api/v1alpha1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@ import (
type FunctionSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Name string `json:"name,omitempty"`
ClassName string `json:"className,omitempty"`
Tenant string `json:"tenant,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
Replicas *int32 `json:"replicas,omitempty"`
MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling
Input InputConf `json:"input,omitempty"`
Output OutputConf `json:"output,omitempty"`
LogTopic string `json:"logTopic,omitempty"`
FuncConfig map[string]string `json:"funcConfig,omitempty"`
Name string `json:"name,omitempty"`
ClassName string `json:"className,omitempty"`
Tenant string `json:"tenant,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
Replicas *int32 `json:"replicas,omitempty"`
MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling
Input InputConf `json:"input,omitempty"`
Output OutputConf `json:"output,omitempty"`
LogTopic string `json:"logTopic,omitempty"`
// +kubebuilder:validation:Optional
// +kubebuilder:pruning:PreserveUnknownFields
FuncConfig *Config `json:"funcConfig,omitempty"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"`
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`
Expand Down
21 changes: 12 additions & 9 deletions api/v1alpha1/sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ import (
type SinkSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Name string `json:"name,omitempty"`
ClassName string `json:"className,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
Tenant string `json:"tenant,omitempty"`
SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector
Replicas *int32 `json:"replicas,omitempty"`
MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling
Input InputConf `json:"input,omitempty"`
SinkConfig map[string]string `json:"sinkConfig,omitempty"`
Name string `json:"name,omitempty"`
ClassName string `json:"className,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
Tenant string `json:"tenant,omitempty"`
SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector
Replicas *int32 `json:"replicas,omitempty"`
MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling
Input InputConf `json:"input,omitempty"`

// +kubebuilder:validation:Optional
// +kubebuilder:pruning:PreserveUnknownFields
SinkConfig *Config `json:"sinkConfig,omitempty"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"`
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`
Expand Down
21 changes: 12 additions & 9 deletions api/v1alpha1/source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ import (
type SourceSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Name string `json:"name,omitempty"`
ClassName string `json:"className,omitempty"`
Tenant string `json:"tenant,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector
Replicas *int32 `json:"replicas,omitempty"`
MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling
Output OutputConf `json:"output,omitempty"`
SourceConfig map[string]string `json:"sourceConfig,omitempty"`
Name string `json:"name,omitempty"`
ClassName string `json:"className,omitempty"`
Tenant string `json:"tenant,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector
Replicas *int32 `json:"replicas,omitempty"`
MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling
Output OutputConf `json:"output,omitempty"`

// +kubebuilder:validation:Optional
// +kubebuilder:pruning:PreserveUnknownFields
SourceConfig *Config `json:"sourceConfig,omitempty"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"`
ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"`
Expand Down
25 changes: 13 additions & 12 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ spec:
forwardSourceMessageProperty:
type: boolean
funcConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
golang:
properties:
go:
Expand Down Expand Up @@ -4411,9 +4410,8 @@ spec:
type: object
type: object
sinkConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
sinkType:
type: string
subscriptionName:
Expand Down Expand Up @@ -6563,9 +6561,8 @@ spec:
type: object
type: object
sourceConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
sourceType:
type: string
tenant:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ spec:
forwardSourceMessageProperty:
type: boolean
funcConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
golang:
properties:
go:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2172,9 +2172,8 @@ spec:
type: object
type: object
sinkConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
sinkType:
type: string
subscriptionName:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2145,9 +2145,8 @@ spec:
type: object
type: object
sourceConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
sourceType:
type: string
tenant:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ spec:
forwardSourceMessageProperty:
type: boolean
funcConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
golang:
properties:
go:
Expand Down Expand Up @@ -4409,9 +4408,8 @@ spec:
type: object
type: object
sinkConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
sinkType:
type: string
subscriptionName:
Expand Down Expand Up @@ -6559,9 +6557,8 @@ spec:
type: object
type: object
sourceConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
sourceType:
type: string
tenant:
Expand Down
3 changes: 1 addition & 2 deletions config/crd/bases/compute.functionmesh.io_functions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ spec:
forwardSourceMessageProperty:
type: boolean
funcConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
golang:
properties:
go:
Expand Down
3 changes: 1 addition & 2 deletions config/crd/bases/compute.functionmesh.io_sinks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2170,9 +2170,8 @@ spec:
type: object
type: object
sinkConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
sinkType:
type: string
subscriptionName:
Expand Down
3 changes: 1 addition & 2 deletions config/crd/bases/compute.functionmesh.io_sources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2143,9 +2143,8 @@ spec:
type: object
type: object
sourceConfig:
additionalProperties:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
sourceType:
type: string
tenant:
Expand Down
5 changes: 4 additions & 1 deletion controllers/spec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,10 @@ func generateResource(resources corev1.ResourceList) *proto.Resources {
}
}

func getUserConfig(configs map[string]string) string {
func getUserConfig(configs *v1alpha1.Config) string {
if configs == nil {
return ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"" might not be a valid json, will that be a problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the userConfig in pulsar functions' proto FunctionDetails is defined as string, and the default value is "", so "" would be fine if there is no userConfig provided.

}
// validated in admission web hook
bytes, _ := json.Marshal(configs)
return string(bytes)
Expand Down
Loading