Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions config/core/resources/pingsource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ spec:
will also be set to "application/json".'
type: string
schedule:
description: 'Schedule is the cronjob schedule. Defaults to `* * *
* *`.'
description: 'Schedule is the cronjob schedule. Defaults to `* * * * *`.'
type: string
sink:
description: 'Sink is a reference to an object that will resolve to
Expand Down Expand Up @@ -214,8 +213,7 @@ spec:
will also be set to "application/json".'
type: string
schedule:
description: 'Schedule is the cronjob schedule. Defaults to `* * *
* *`.'
description: 'Schedule is the cronjob schedule. Defaults to `* * * * *`.'
type: string
sink:
description: 'Sink is a reference to an object that will resolve to
Expand Down Expand Up @@ -349,19 +347,18 @@ spec:
type: string
x-kubernetes-preserve-unknown-fields: true
contentType:
description: 'ContentType is the media type of `data` or `dataBase64`. Default is empty'
description: 'ContentType is the media type of `data` or `dataBase64`. Default is empty.'
type: string
data:
description: 'Data is data used as the body of the event posted to the sink. Default is empty.
Mutually exclusive with `dataBase64`.'
type: string
dataBase64:
description: 'DataBase64 is base64 encoded binary data used as the body of the event posted to the sink.
Mutually exclusive with `data`.'
Default is empty. Mutually exclusive with `data`.'
type: string
schedule:
description: 'Schedule is the cron schedule. Defaults to `* * *
* *`.'
description: 'Schedule is the cron schedule. Defaults to `* * * * *`.'
type: string
sink:
description: 'Sink is a reference to an object that will resolve to
Expand Down
20 changes: 14 additions & 6 deletions pkg/adapter/mtping/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mtping
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"time"

Expand Down Expand Up @@ -71,7 +72,10 @@ func NewCronJobsRunner(ceClient cloudevents.Client, kubeClient kubernetes.Interf
}

func (a *cronJobsRunner) AddSchedule(source *v1beta2.PingSource) cron.EntryID {
event := makeEvent(source)
event, err := makeEvent(source)
if err != nil {
a.Logger.Error("failed to makeEvent: ", zap.Error(err))
}

ctx := context.Background()
ctx = cloudevents.ContextWithTarget(ctx, source.Status.SinkURI.String())
Expand Down Expand Up @@ -132,7 +136,7 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event)
}
}

func makeEvent(source *v1beta2.PingSource) cloudevents.Event {
func makeEvent(source *v1beta2.PingSource) (cloudevents.Event, error) {
event := cloudevents.NewEvent()
event.SetType(v1beta2.PingSourceEventType)
event.SetSource(v1beta2.PingSourceSource(source.Namespace, source.Name))
Expand All @@ -149,7 +153,7 @@ func makeEvent(source *v1beta2.PingSource) cloudevents.Event {
// b. If contentType is `application/json`, unmarshal it into an interface, event.DataEncoded will be json.Marshal(interface),
// this is to be compatible with the existing v1beta1 PingSource -> CloudEvent conversion logic, to make sure
// that `data` is populated in the cloudevent json format instead of `data_base64`, and not breaking subscribers
// that does not leverage cloudevents sdk.
// that do not leverage cloudevents sdk.
var data interface{}
if source.Spec.DataBase64 != "" {
data = []byte(source.Spec.DataBase64)
Expand All @@ -159,16 +163,20 @@ func makeEvent(source *v1beta2.PingSource) cloudevents.Event {
// unmarshal the body into an interface, JSON validation is done in pingsource_validation
// ignoring the error returned by json.Unmarshal here.
var objmap map[string]*json.RawMessage
_ = json.Unmarshal([]byte(source.Spec.Data), &objmap)
if err := json.Unmarshal([]byte(source.Spec.Data), &objmap); err != nil {
return event, fmt.Errorf("error unmarshalling source.Spec.Data: %v, err: %v", source.Spec.Data, err)
}
data = objmap
default:
data = []byte(source.Spec.Data)
}
}

if data != nil {
_ = event.SetData(source.Spec.ContentType, data)
if err := event.SetData(source.Spec.ContentType, data); err != nil {
return event, fmt.Errorf("error when SetData(%v, %v), err: %v", source.Spec.ContentType, data, err)
}
}

return event
return event, nil
}
12 changes: 9 additions & 3 deletions pkg/apis/sources/v1beta1/ping_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (source *PingSource) ConvertTo(ctx context.Context, obj apis.Convertible) e
delete(annotations, V1B2SpecAnnotationKey)
}

// cannot unmarshal, do a normal conversion
// if V1B2SpecAnnotationKey does not exist or we cannot unmarshal it, do a normal conversion
if !ok {
sink.Spec = v1beta2.PingSourceSpec{
SourceSpec: source.Spec.SourceSpec,
Expand All @@ -97,7 +97,10 @@ func (source *PingSource) ConvertTo(ctx context.Context, obj apis.Convertible) e

// marshal and store v1beta1.PingSource.Spec into V1B1SpecAnnotationKey
// this is to help if we need to convert back to v1beta1.PingSource
v1beta1Spec, _ := json.Marshal(source.Spec)
v1beta1Spec, err := json.Marshal(source.Spec)
if err != nil {
return fmt.Errorf("error marshalling source.Spec: %v, err: %v", source.Spec, err)
}
annotations[V1B1SpecAnnotationKey] = string(v1beta1Spec)
sink.SetAnnotations(annotations)

Expand Down Expand Up @@ -136,7 +139,10 @@ func (sink *PingSource) ConvertFrom(ctx context.Context, obj apis.Convertible) e

// marshal and store v1beta2.PingSource.Spec into V1B2SpecAnnotationKey
// this is to help if we need to convert back to v1beta2.PingSource
v1beta2Configuration, _ := json.Marshal(source.Spec)
v1beta2Configuration, err := json.Marshal(source.Spec)
if err != nil {
return fmt.Errorf("error marshalling source.Spec: %v, err: %v", source.Spec, err)
}
annotations[V1B2SpecAnnotationKey] = string(v1beta2Configuration)
sink.SetAnnotations(annotations)

Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/sources/v1beta2/ping_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type PingSourceSpec struct {
// List of valid timezone values: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
Timezone string `json:"timezone,omitempty"`

// ContentType is the media type of Data or DataBase64. Default is empty
// ContentType is the media type of Data or DataBase64. Default is empty.
// +optional
ContentType string `json:"contentType,omitempty"`

Expand Down
33 changes: 16 additions & 17 deletions pkg/apis/sources/v1beta2/ping_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,26 @@ func (cs *PingSourceSpec) Validate(ctx context.Context) *apis.FieldError {

if cs.Data != "" && cs.DataBase64 != "" {
errs = errs.Also(apis.ErrMultipleOneOf("data", "dataBase64"))
} else {
if cs.DataBase64 != "" {
decoded, err := base64.StdEncoding.DecodeString(cs.DataBase64)
// invalid base64 string
if err != nil {
errs = errs.Also(apis.ErrInvalidValue(err, "dataBase64"))
} else {
// validate if the decoded base64 string is valid JSON
if cs.ContentType == cloudevents.ApplicationJSON {
if err := validateJSON(string(decoded)); err != nil {
errs = errs.Also(apis.ErrInvalidValue(err, "dataBase64"))
}
} else if cs.DataBase64 != "" {
decoded, err := base64.StdEncoding.DecodeString(cs.DataBase64)
// invalid base64 string
if err != nil {
errs = errs.Also(apis.ErrInvalidValue(err, "dataBase64"))
} else {
// validate if the decoded base64 string is valid JSON
if cs.ContentType == cloudevents.ApplicationJSON {
if err := validateJSON(string(decoded)); err != nil {
errs = errs.Also(apis.ErrInvalidValue(err, "dataBase64"))
}
}
} else if cs.Data != "" && cs.ContentType == cloudevents.ApplicationJSON {
// validate if data is valid JSON
if err := validateJSON(cs.Data); err != nil {
errs = errs.Also(apis.ErrInvalidValue(err, "data"))
}
}
} else if cs.Data != "" && cs.ContentType == cloudevents.ApplicationJSON {
// validate if data is valid JSON
if err := validateJSON(cs.Data); err != nil {
errs = errs.Also(apis.ErrInvalidValue(err, "data"))
}
}

return errs
}

Expand Down