diff --git a/config/core/resources/pingsource.yaml b/config/core/resources/pingsource.yaml index 958cbf94c32..406085be8a6 100644 --- a/config/core/resources/pingsource.yaml +++ b/config/core/resources/pingsource.yaml @@ -64,8 +64,7 @@ spec: will also be set to "application/json".' type: string schedule: - description: 'Schedule is the cronjob schedule. Defaults to `* * * - * *`.' + description: 'Schedule is the cronjob schedule. Defaults to `* * * * *`.' type: string sink: description: 'Sink is a reference to an object that will resolve to @@ -214,8 +213,7 @@ spec: will also be set to "application/json".' type: string schedule: - description: 'Schedule is the cronjob schedule. Defaults to `* * * - * *`.' + description: 'Schedule is the cronjob schedule. Defaults to `* * * * *`.' type: string sink: description: 'Sink is a reference to an object that will resolve to @@ -349,7 +347,7 @@ spec: type: string x-kubernetes-preserve-unknown-fields: true contentType: - description: 'ContentType is the media type of `data` or `dataBase64`. Default is empty' + description: 'ContentType is the media type of `data` or `dataBase64`. Default is empty.' type: string data: description: 'Data is data used as the body of the event posted to the sink. Default is empty. @@ -357,11 +355,10 @@ spec: type: string dataBase64: description: 'DataBase64 is base64 encoded binary data used as the body of the event posted to the sink. - Mutually exclusive with `data`.' + Default is empty. Mutually exclusive with `data`.' type: string schedule: - description: 'Schedule is the cron schedule. Defaults to `* * * - * *`.' + description: 'Schedule is the cron schedule. Defaults to `* * * * *`.' type: string sink: description: 'Sink is a reference to an object that will resolve to diff --git a/pkg/adapter/mtping/runner.go b/pkg/adapter/mtping/runner.go index 4ddd5cd01e6..7d8dfe5bcba 100644 --- a/pkg/adapter/mtping/runner.go +++ b/pkg/adapter/mtping/runner.go @@ -19,6 +19,7 @@ package mtping import ( "context" "encoding/json" + "fmt" "math/rand" "time" @@ -71,7 +72,10 @@ func NewCronJobsRunner(ceClient cloudevents.Client, kubeClient kubernetes.Interf } func (a *cronJobsRunner) AddSchedule(source *v1beta2.PingSource) cron.EntryID { - event := makeEvent(source) + event, err := makeEvent(source) + if err != nil { + a.Logger.Error("failed to makeEvent: ", zap.Error(err)) + } ctx := context.Background() ctx = cloudevents.ContextWithTarget(ctx, source.Status.SinkURI.String()) @@ -132,7 +136,7 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event) } } -func makeEvent(source *v1beta2.PingSource) cloudevents.Event { +func makeEvent(source *v1beta2.PingSource) (cloudevents.Event, error) { event := cloudevents.NewEvent() event.SetType(v1beta2.PingSourceEventType) event.SetSource(v1beta2.PingSourceSource(source.Namespace, source.Name)) @@ -149,7 +153,7 @@ func makeEvent(source *v1beta2.PingSource) cloudevents.Event { // b. If contentType is `application/json`, unmarshal it into an interface, event.DataEncoded will be json.Marshal(interface), // this is to be compatible with the existing v1beta1 PingSource -> CloudEvent conversion logic, to make sure // that `data` is populated in the cloudevent json format instead of `data_base64`, and not breaking subscribers - // that does not leverage cloudevents sdk. + // that do not leverage cloudevents sdk. var data interface{} if source.Spec.DataBase64 != "" { data = []byte(source.Spec.DataBase64) @@ -159,7 +163,9 @@ func makeEvent(source *v1beta2.PingSource) cloudevents.Event { // unmarshal the body into an interface, JSON validation is done in pingsource_validation // ignoring the error returned by json.Unmarshal here. var objmap map[string]*json.RawMessage - _ = json.Unmarshal([]byte(source.Spec.Data), &objmap) + if err := json.Unmarshal([]byte(source.Spec.Data), &objmap); err != nil { + return event, fmt.Errorf("error unmarshalling source.Spec.Data: %v, err: %v", source.Spec.Data, err) + } data = objmap default: data = []byte(source.Spec.Data) @@ -167,8 +173,10 @@ func makeEvent(source *v1beta2.PingSource) cloudevents.Event { } if data != nil { - _ = event.SetData(source.Spec.ContentType, data) + if err := event.SetData(source.Spec.ContentType, data); err != nil { + return event, fmt.Errorf("error when SetData(%v, %v), err: %v", source.Spec.ContentType, data, err) + } } - return event + return event, nil } diff --git a/pkg/apis/sources/v1beta1/ping_conversion.go b/pkg/apis/sources/v1beta1/ping_conversion.go index bfb9d0485e2..c68bfdb3ff3 100644 --- a/pkg/apis/sources/v1beta1/ping_conversion.go +++ b/pkg/apis/sources/v1beta1/ping_conversion.go @@ -77,7 +77,7 @@ func (source *PingSource) ConvertTo(ctx context.Context, obj apis.Convertible) e delete(annotations, V1B2SpecAnnotationKey) } - // cannot unmarshal, do a normal conversion + // if V1B2SpecAnnotationKey does not exist or we cannot unmarshal it, do a normal conversion if !ok { sink.Spec = v1beta2.PingSourceSpec{ SourceSpec: source.Spec.SourceSpec, @@ -97,7 +97,10 @@ func (source *PingSource) ConvertTo(ctx context.Context, obj apis.Convertible) e // marshal and store v1beta1.PingSource.Spec into V1B1SpecAnnotationKey // this is to help if we need to convert back to v1beta1.PingSource - v1beta1Spec, _ := json.Marshal(source.Spec) + v1beta1Spec, err := json.Marshal(source.Spec) + if err != nil { + return fmt.Errorf("error marshalling source.Spec: %v, err: %v", source.Spec, err) + } annotations[V1B1SpecAnnotationKey] = string(v1beta1Spec) sink.SetAnnotations(annotations) @@ -136,7 +139,10 @@ func (sink *PingSource) ConvertFrom(ctx context.Context, obj apis.Convertible) e // marshal and store v1beta2.PingSource.Spec into V1B2SpecAnnotationKey // this is to help if we need to convert back to v1beta2.PingSource - v1beta2Configuration, _ := json.Marshal(source.Spec) + v1beta2Configuration, err := json.Marshal(source.Spec) + if err != nil { + return fmt.Errorf("error marshalling source.Spec: %v, err: %v", source.Spec, err) + } annotations[V1B2SpecAnnotationKey] = string(v1beta2Configuration) sink.SetAnnotations(annotations) diff --git a/pkg/apis/sources/v1beta2/ping_types.go b/pkg/apis/sources/v1beta2/ping_types.go index 414a27cedcc..45beab33867 100644 --- a/pkg/apis/sources/v1beta2/ping_types.go +++ b/pkg/apis/sources/v1beta2/ping_types.go @@ -68,7 +68,7 @@ type PingSourceSpec struct { // List of valid timezone values: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones Timezone string `json:"timezone,omitempty"` - // ContentType is the media type of Data or DataBase64. Default is empty + // ContentType is the media type of Data or DataBase64. Default is empty. // +optional ContentType string `json:"contentType,omitempty"` diff --git a/pkg/apis/sources/v1beta2/ping_validation.go b/pkg/apis/sources/v1beta2/ping_validation.go index dfab0499cbc..325fe9136e2 100644 --- a/pkg/apis/sources/v1beta2/ping_validation.go +++ b/pkg/apis/sources/v1beta2/ping_validation.go @@ -56,27 +56,26 @@ func (cs *PingSourceSpec) Validate(ctx context.Context) *apis.FieldError { if cs.Data != "" && cs.DataBase64 != "" { errs = errs.Also(apis.ErrMultipleOneOf("data", "dataBase64")) - } else { - if cs.DataBase64 != "" { - decoded, err := base64.StdEncoding.DecodeString(cs.DataBase64) - // invalid base64 string - if err != nil { - errs = errs.Also(apis.ErrInvalidValue(err, "dataBase64")) - } else { - // validate if the decoded base64 string is valid JSON - if cs.ContentType == cloudevents.ApplicationJSON { - if err := validateJSON(string(decoded)); err != nil { - errs = errs.Also(apis.ErrInvalidValue(err, "dataBase64")) - } + } else if cs.DataBase64 != "" { + decoded, err := base64.StdEncoding.DecodeString(cs.DataBase64) + // invalid base64 string + if err != nil { + errs = errs.Also(apis.ErrInvalidValue(err, "dataBase64")) + } else { + // validate if the decoded base64 string is valid JSON + if cs.ContentType == cloudevents.ApplicationJSON { + if err := validateJSON(string(decoded)); err != nil { + errs = errs.Also(apis.ErrInvalidValue(err, "dataBase64")) } } - } else if cs.Data != "" && cs.ContentType == cloudevents.ApplicationJSON { - // validate if data is valid JSON - if err := validateJSON(cs.Data); err != nil { - errs = errs.Also(apis.ErrInvalidValue(err, "data")) - } + } + } else if cs.Data != "" && cs.ContentType == cloudevents.ApplicationJSON { + // validate if data is valid JSON + if err := validateJSON(cs.Data); err != nil { + errs = errs.Also(apis.ErrInvalidValue(err, "data")) } } + return errs }