diff --git a/Gopkg.lock b/Gopkg.lock index ee0dd77bf6b..a23af936e8b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -58,7 +58,7 @@ version = "v2.1.15" [[projects]] - digest = "1:70a8e5f09e19aba14064648751abc86f20484e25476866b03ce711007e96f339" + digest = "1:7ded9717607b3eea18859dca3323fac420fa142ff4aa592cb07d8c7eb4a67d5e" name = "github.com/cloudevents/sdk-go" packages = [ "pkg/cloudevents", @@ -68,13 +68,14 @@ "pkg/cloudevents/datacodec", "pkg/cloudevents/datacodec/json", "pkg/cloudevents/datacodec/xml", + "pkg/cloudevents/observability", "pkg/cloudevents/transport", "pkg/cloudevents/transport/http", "pkg/cloudevents/types", ] pruneopts = "NUT" - revision = "c1a6fb0cc8226884014fb5063ab406ed9504a663" - version = "0.4.0" + revision = "a4c06e590662f2aa30ac8f5974347c9832889765" + version = "0.4.1" [[projects]] digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec" diff --git a/Gopkg.toml b/Gopkg.toml index 4f616e4b7fc..1007ae38aea 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -131,7 +131,7 @@ required = [ name = "github.com/nats-io/nats-streaming-server" version = "0.11.0" -# The latest release as of March 12, 2019. +# The latest release as of March 20, 2019. [[constraint]] name = "github.com/cloudevents/sdk-go" - version = "=0.4.0" + version = "=0.4.1" diff --git a/pkg/broker/receiver.go b/pkg/broker/receiver.go index 8d06df8c9d9..ca590b6fe34 100644 --- a/pkg/broker/receiver.go +++ b/pkg/broker/receiver.go @@ -21,6 +21,7 @@ import ( "errors" "net/http" "net/url" + "strings" "time" "github.com/cloudevents/sdk-go/pkg/cloudevents" @@ -31,6 +32,7 @@ import ( "github.com/knative/eventing/pkg/provisioners" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -40,6 +42,22 @@ const ( writeTimeout = 1 * time.Minute ) +var ( + // These MUST be lowercase strings, as they will be compared against lowercase strings. + forwardHeaders = sets.NewString( + // tracing + "x-request-id", + ) + // These MUST be lowercase strings, as they will be compared against lowercase strings. + forwardPrefixes = []string{ + // knative + "knative-", + // tracing + "x-b3-", + "x-ot-", + } +) + // Receiver parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. type Receiver struct { logger *zap.Logger @@ -142,18 +160,22 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp r.logger.Debug("Received message", zap.Any("triggerRef", triggerRef)) - responseEvent, err := r.sendEvent(ctx, triggerRef, &event) + responseEvent, err := r.sendEvent(ctx, tctx, triggerRef, &event) if err != nil { r.logger.Error("Error sending the event", zap.Error(err)) return err } resp.Status = http.StatusAccepted resp.Event = responseEvent + + // TODO Add filtered headers (mostly tracing) to the response. We are waiting for CloudEvents + // SDK to allow this. + return nil } // sendEvent sends an event to a subscriber if the trigger filter passes. -func (r *Receiver) sendEvent(ctx context.Context, trigger provisioners.ChannelReference, event *cloudevents.Event) (*cloudevents.Event, error) { +func (r *Receiver) sendEvent(ctx context.Context, tctx cehttp.TransportContext, trigger provisioners.ChannelReference, event *cloudevents.Event) (*cloudevents.Event, error) { t, err := r.getTrigger(ctx, trigger) if err != nil { r.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", trigger)) @@ -179,6 +201,7 @@ func (r *Receiver) sendEvent(ctx context.Context, trigger provisioners.ChannelRe } sendingCtx := cecontext.WithTarget(ctx, subscriberURI.String()) + sendingCtx = addFilteredHeaders(sendingCtx, tctx) return r.ceHttp.Send(sendingCtx, *event) } @@ -214,3 +237,28 @@ func (r *Receiver) shouldSendMessage(ts *eventingv1alpha1.TriggerSpec, event *cl } return true } + +func addFilteredHeaders(ctx context.Context, tctx cehttp.TransportContext) context.Context { + // Helper function that adds the header name and all its values. + addHeader := func(ctx context.Context, n string, v []string) context.Context { + for _, iv := range v { + ctx = cehttp.ContextWithHeader(ctx, n, iv) + } + return ctx + } + + for n, v := range tctx.Header { + lower := strings.ToLower(n) + if forwardHeaders.Has(lower) { + ctx = addHeader(ctx, n, v) + continue + } + for _, prefix := range forwardPrefixes { + if strings.HasPrefix(lower, prefix) { + ctx = addHeader(ctx, n, v) + break + } + } + } + return ctx +} diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/client/client.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/client/client.go index d9c948cd7a4..41544d929b4 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/client/client.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/client/client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" "sync" @@ -69,6 +70,17 @@ type ceClient struct { } func (c *ceClient) Send(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, error) { + ctx, r := observability.NewReporter(ctx, ReportSend) + resp, err := c.obsSend(ctx, event) + if err != nil { + r.Error() + } else { + r.OK() + } + return resp, err +} + +func (c *ceClient) obsSend(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, error) { // Confirm we have a transport set. if c.transport == nil { return nil, fmt.Errorf("client not ready, transport not initialized") @@ -89,8 +101,26 @@ func (c *ceClient) Send(ctx context.Context, event cloudevents.Event) (*cloudeve // Receive is called from from the transport on event delivery. func (c *ceClient) Receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { + ctx, r := observability.NewReporter(ctx, ReportReceive) + err := c.obsReceive(ctx, event, resp) + if err != nil { + r.Error() + } else { + r.OK() + } + return err +} + +func (c *ceClient) obsReceive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { if c.fn != nil { + ctx, rFn := observability.NewReporter(ctx, ReportReceiveFn) err := c.fn.invoke(ctx, event, resp) + if err != nil { + rFn.Error() + } else { + rFn.OK() + } + // Apply the defaulter chain to the outgoing event. if err == nil && resp != nil && resp.Event != nil && len(c.eventDefaulterFns) > 0 { for _, fn := range c.eventDefaulterFns { diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/client/doc.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/client/doc.go new file mode 100644 index 00000000000..a6a602bb410 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/client/doc.go @@ -0,0 +1,6 @@ +/* +Package client holds the recommended entry points for interacting with the CloudEvents Golang SDK. The client wraps +a selected transport. The client adds validation and defaulting for sending events, and flexible receiver method +registration. For full details, read the `client.Client` documentation. +*/ +package client diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/client/observability.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/client/observability.go new file mode 100644 index 00000000000..69f0df2dfd6 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/client/observability.go @@ -0,0 +1,59 @@ +package client + +import ( + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" +) + +var ( + LatencyMs = stats.Float64("client/latency", "The latency in milliseconds for the CloudEvents client methods.", "ms") +) + +var ( + LatencyView = &view.View{ + Name: "client/latency", + Measure: LatencyMs, + Description: "The distribution of latency inside of client for CloudEvents.", + Aggregation: view.Distribution(0, .01, .1, 1, 10, 100, 1000, 10000), + TagKeys: observability.LatencyTags(), + } +) + +type Observed int32 + +const ( + ReportSend Observed = iota + ReportReceive + ReportReceiveFn +) + +func (o Observed) TraceName() string { + switch o { + case ReportSend: + return "client/send" + case ReportReceive: + return "client/receive" + case ReportReceiveFn: + return "client/receive/fn" + default: + return "client/unknown" + } +} + +func (o Observed) MethodName() string { + switch o { + case ReportSend: + return "send" + case ReportReceive: + return "receive" + case ReportReceiveFn: + return "receive/fn" + default: + return "unknown" + } +} + +func (o Observed) LatencyMs() *stats.Float64Measure { + return LatencyMs +} diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/codec/doc.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/codec/doc.go new file mode 100644 index 00000000000..f6028398f5a --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/codec/doc.go @@ -0,0 +1,5 @@ +/* +Package codec holds the encoder/decoder implementation for structured encodings using `application/json` of the +whole CloudEvent. +*/ +package codec diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/codec/jsoncodec.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/codec/jsoncodec.go index 21f10c68375..42847d88c0d 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/codec/jsoncodec.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/codec/jsoncodec.go @@ -1,14 +1,29 @@ package codec import ( + "context" "encoding/json" "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" "log" "strconv" ) +// JsonEncodeV01 takes in a cloudevent.Event and outputs the byte representation of that event using CloudEvents +// version 0.1 structured json formatting rules. func JsonEncodeV01(e cloudevents.Event) ([]byte, error) { + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportEncode, v: "v0.1"}) + b, err := obsJsonEncodeV01(e) + if err != nil { + r.Error() + } else { + r.OK() + } + return b, err +} + +func obsJsonEncodeV01(e cloudevents.Event) ([]byte, error) { ctx := e.Context.AsV01() if ctx.ContentType == nil { ctx.ContentType = cloudevents.StringOfApplicationJSON() @@ -16,7 +31,20 @@ func JsonEncodeV01(e cloudevents.Event) ([]byte, error) { return jsonEncode(ctx, e.Data) } +// JsonEncodeV02 takes in a cloudevent.Event and outputs the byte representation of that event using CloudEvents +// version 0.2 structured json formatting rules. func JsonEncodeV02(e cloudevents.Event) ([]byte, error) { + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportEncode, v: "v0.2"}) + b, err := obsJsonEncodeV02(e) + if err != nil { + r.Error() + } else { + r.OK() + } + return b, err +} + +func obsJsonEncodeV02(e cloudevents.Event) ([]byte, error) { ctx := e.Context.AsV02() if ctx.ContentType == nil { ctx.ContentType = cloudevents.StringOfApplicationJSON() @@ -24,7 +52,20 @@ func JsonEncodeV02(e cloudevents.Event) ([]byte, error) { return jsonEncode(ctx, e.Data) } +// JsonEncodeV03 takes in a cloudevent.Event and outputs the byte representation of that event using CloudEvents +// version 0.3 structured json formatting rules. func JsonEncodeV03(e cloudevents.Event) ([]byte, error) { + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportEncode, v: "v0.3"}) + b, err := obsJsonEncodeV03(e) + if err != nil { + r.Error() + } else { + r.OK() + } + return b, err +} + +func obsJsonEncodeV03(e cloudevents.Event) ([]byte, error) { ctx := e.Context.AsV03() if ctx.DataContentType == nil { ctx.DataContentType = cloudevents.StringOfApplicationJSON() @@ -69,7 +110,20 @@ func jsonEncode(ctx cloudevents.EventContext, data interface{}) ([]byte, error) return body, nil } +// JsonDecodeV01 takes in the byte representation of a version 0.1 structured json CloudEvent and returns a +// cloudevent.Event or an error if there are parsing errors. func JsonDecodeV01(body []byte) (*cloudevents.Event, error) { + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportDecode, v: "v0.1"}) + e, err := obsJsonDecodeV01(body) + if err != nil { + r.Error() + } else { + r.OK() + } + return e, err +} + +func obsJsonDecodeV01(body []byte) (*cloudevents.Event, error) { ec := cloudevents.EventContextV01{} if err := json.Unmarshal(body, &ec); err != nil { return nil, err @@ -91,7 +145,20 @@ func JsonDecodeV01(body []byte) (*cloudevents.Event, error) { }, nil } +// JsonDecodeV02 takes in the byte representation of a version 0.2 structured json CloudEvent and returns a +// cloudevent.Event or an error if there are parsing errors. func JsonDecodeV02(body []byte) (*cloudevents.Event, error) { + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportDecode, v: "v0.2"}) + e, err := obsJsonDecodeV02(body) + if err != nil { + r.Error() + } else { + r.OK() + } + return e, err +} + +func obsJsonDecodeV02(body []byte) (*cloudevents.Event, error) { ec := cloudevents.EventContextV02{} if err := json.Unmarshal(body, &ec); err != nil { return nil, err @@ -113,7 +180,20 @@ func JsonDecodeV02(body []byte) (*cloudevents.Event, error) { }, nil } +// JsonDecodeV03 takes in the byte representation of a version 0.3 structured json CloudEvent and returns a +// cloudevent.Event or an error if there are parsing errors. func JsonDecodeV03(body []byte) (*cloudevents.Event, error) { + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportDecode, v: "v0.3"}) + e, err := obsJsonDecodeV03(body) + if err != nil { + r.Error() + } else { + r.OK() + } + return e, err +} + +func obsJsonDecodeV03(body []byte) (*cloudevents.Event, error) { ec := cloudevents.EventContextV03{} if err := json.Unmarshal(body, &ec); err != nil { return nil, err diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/codec/observability.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/codec/observability.go new file mode 100644 index 00000000000..c710ea191f0 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/codec/observability.go @@ -0,0 +1,75 @@ +package codec + +import ( + "fmt" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" +) + +var ( + LatencyMs = stats.Float64("codec/json/latency", "The latency in milliseconds for the CloudEvents json codec methods.", "ms") +) + +var ( + LatencyView = &view.View{ + Name: "codec/json/latency", + Measure: LatencyMs, + Description: "The distribution of latency inside of the json codec for CloudEvents.", + Aggregation: view.Distribution(0, .01, .1, 1, 10, 100, 1000, 10000), + TagKeys: observability.LatencyTags(), + } +) + +type Observed int32 + +const ( + ReportEncode Observed = iota + ReportDecode +) + +func (o Observed) TraceName() string { + switch o { + case ReportEncode: + return "codec/json/encode" + case ReportDecode: + return "codec/json/decode" + default: + return "codec/unknown" + } +} + +func (o Observed) MethodName() string { + switch o { + case ReportEncode: + return "encode" + case ReportDecode: + return "decode" + default: + return "unknown" + } +} + +func (o Observed) LatencyMs() *stats.Float64Measure { + return LatencyMs +} + +// CodecObserved is a wrapper to append version to Observed. +type CodecObserved struct { + // Method + o Observed + // Version + v string +} + +func (c CodecObserved) TraceName() string { + return fmt.Sprintf("%s/%s", c.o.TraceName(), c.v) +} + +func (c CodecObserved) MethodName() string { + return fmt.Sprintf("%s/%s", c.o.MethodName(), c.v) +} + +func (c CodecObserved) LatencyMs() *stats.Float64Measure { + return c.o.LatencyMs() +} diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/context/context.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/context/context.go index 2c4526b1035..18afb6282b6 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/context/context.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/context/context.go @@ -10,10 +10,13 @@ type targetKeyType struct{} var targetKey = targetKeyType{} +// WithTarget returns back a new context with the given target. Target is intended to be transport dependent. +// For http transport, `target` should be a full URL and will be injected into the outbound http request. func WithTarget(ctx context.Context, target string) context.Context { return context.WithValue(ctx, targetKey, target) } +// TargetFrom looks in the given context and returns `target` as a parsed url if found and valid, otherwise nil. func TargetFrom(ctx context.Context) *url.URL { c := ctx.Value(targetKey) if c != nil { diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/context/doc.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/context/doc.go new file mode 100644 index 00000000000..377cab850fc --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/context/doc.go @@ -0,0 +1,5 @@ +/* +Package context holds the last resort overrides and fyi objects that can be passed to clients and transports added to +context.Context objects. +*/ +package context diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/codec.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/codec.go index 1f6ce63d365..929a8754bb9 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/codec.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/codec.go @@ -1,12 +1,20 @@ package datacodec import ( + "context" "fmt" "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json" "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" ) +// Decoder is the expected function signature for decoding `in` to `out`. What +// `in` is could be decoder dependent. For example, `in` could be bytes, or a +// base64 string. type Decoder func(in, out interface{}) error + +// Encoder is the expected function signature for encoding `in` to bytes. +// Returns an error if the encoder has an issue encoding `in`. type Encoder func(in interface{}) ([]byte, error) var decoder map[string]Decoder @@ -25,22 +33,56 @@ func init() { AddEncoder("application/xml", xml.Encode) } +// AddDecoder registers a decoder for a given content type. The codecs will use +// these to decode the data payload from a cloudevent.Event object. func AddDecoder(contentType string, fn Decoder) { decoder[contentType] = fn } +// AddEncoder registers an encoder for a given content type. The codecs will +// use these to encode the data payload for a cloudevent.Event object. func AddEncoder(contentType string, fn Encoder) { encoder[contentType] = fn } +// Decode looks up and invokes the decoder registered for the given content +// type. An error is returned if no decoder is registered for the given +// content type. func Decode(contentType string, in, out interface{}) error { + // TODO: wire in context. + _, r := observability.NewReporter(context.Background(), ReportDecode) + err := obsDecode(contentType, in, out) + if err != nil { + r.Error() + } else { + r.OK() + } + return err +} + +func obsDecode(contentType string, in, out interface{}) error { if fn, ok := decoder[contentType]; ok { return fn(in, out) } return fmt.Errorf("[decode] unsupported content type: %q", contentType) } +// Encode looks up and invokes the encoder registered for the given content +// type. An error is returned if no encoder is registered for the given +// content type. func Encode(contentType string, in interface{}) ([]byte, error) { + // TODO: wire in context. + _, r := observability.NewReporter(context.Background(), ReportEncode) + b, err := obsEncode(contentType, in) + if err != nil { + r.Error() + } else { + r.OK() + } + return b, err +} + +func obsEncode(contentType string, in interface{}) ([]byte, error) { if fn, ok := encoder[contentType]; ok { return fn(in) } diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/doc.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/doc.go new file mode 100644 index 00000000000..9e401534e27 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/doc.go @@ -0,0 +1,5 @@ +/* +Package datacodec holds the data codec registry and adds known encoders and decoders supporting media types such as +`application/json` and `application/xml`. +*/ +package datacodec diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json/data.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json/data.go index 38d75566713..9b362d305f2 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json/data.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json/data.go @@ -1,13 +1,30 @@ package json import ( + "context" "encoding/json" "fmt" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" "reflect" "strconv" ) +// Decode takes `in` as []byte, or base64 string, normalizes in to unquoted and +// base64 decoded []byte if required, and then attempts to use json.Unmarshal +// to convert those bytes to `out`. Returns and error if this process fails. func Decode(in, out interface{}) error { + // TODO: wire in context. + _, r := observability.NewReporter(context.Background(), ReportDecode) + err := obsDecode(in, out) + if err != nil { + r.Error() + } else { + r.OK() + } + return err +} + +func obsDecode(in, out interface{}) error { if in == nil { return nil } @@ -42,7 +59,22 @@ func Decode(in, out interface{}) error { return nil } +// Encode attempts to json.Marshal `in` into bytes. Encode will inspect `in` +// and returns `in` unmodified if it is detected that `in` is already a []byte; +// Or json.Marshal errors. func Encode(in interface{}) ([]byte, error) { + // TODO: wire in context. + _, r := observability.NewReporter(context.Background(), ReportEncode) + b, err := obsEncode(in) + if err != nil { + r.Error() + } else { + r.OK() + } + return b, err +} + +func obsEncode(in interface{}) ([]byte, error) { if in == nil { return nil, nil } diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json/doc.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json/doc.go new file mode 100644 index 00000000000..2f3d2982d25 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json/doc.go @@ -0,0 +1,4 @@ +/* +Package datacodec/json holds the encoder/decoder implementation for `application/json`. +*/ +package json diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json/observability.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json/observability.go new file mode 100644 index 00000000000..5be43645486 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json/observability.go @@ -0,0 +1,54 @@ +package json + +import ( + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" +) + +var ( + LatencyMs = stats.Float64("datacodec/json/latency", "The latency in milliseconds for the CloudEvents json data codec methods.", "ms") +) + +var ( + LatencyView = &view.View{ + Name: "datacodec/json/latency", + Measure: LatencyMs, + Description: "The distribution of latency inside of the json data codec for CloudEvents.", + Aggregation: view.Distribution(0, .01, .1, 1, 10, 100, 1000, 10000), + TagKeys: observability.LatencyTags(), + } +) + +type Observed int32 + +const ( + ReportEncode Observed = iota + ReportDecode +) + +func (o Observed) TraceName() string { + switch o { + case ReportEncode: + return "datacodec/json/encode" + case ReportDecode: + return "datacodec/json/decode" + default: + return "datacodec/json/unknown" + } +} + +func (o Observed) MethodName() string { + switch o { + case ReportEncode: + return "encode" + case ReportDecode: + return "decode" + default: + return "unknown" + } +} + +func (o Observed) LatencyMs() *stats.Float64Measure { + return LatencyMs +} diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/observability.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/observability.go new file mode 100644 index 00000000000..0a87278f811 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/observability.go @@ -0,0 +1,54 @@ +package datacodec + +import ( + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" +) + +var ( + LatencyMs = stats.Float64("datacodec/latency", "The latency in milliseconds for the CloudEvents generic data codec methods.", "ms") +) + +var ( + LatencyView = &view.View{ + Name: "datacodec/latency", + Measure: LatencyMs, + Description: "The distribution of latency inside of the generic data codec for CloudEvents.", + Aggregation: view.Distribution(0, .01, .1, 1, 10, 100, 1000, 10000), + TagKeys: observability.LatencyTags(), + } +) + +type Observed int32 + +const ( + ReportEncode Observed = iota + ReportDecode +) + +func (o Observed) TraceName() string { + switch o { + case ReportEncode: + return "datacodec/encode" + case ReportDecode: + return "datacodec/decode" + default: + return "datacodec/unknown" + } +} + +func (o Observed) MethodName() string { + switch o { + case ReportEncode: + return "encode" + case ReportDecode: + return "decode" + default: + return "unknown" + } +} + +func (o Observed) LatencyMs() *stats.Float64Measure { + return LatencyMs +} diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml/data.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml/data.go index a904a250c07..8a644932f2a 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml/data.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml/data.go @@ -1,13 +1,30 @@ package xml import ( + "context" "encoding/base64" "encoding/xml" "fmt" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" "strconv" ) +// Decode takes `in` as []byte, or base64 string, normalizes in to unquoted and +// base64 decoded []byte if required, and then attempts to use xml.Unmarshal +// to convert those bytes to `out`. Returns and error if this process fails. func Decode(in, out interface{}) error { + // TODO: wire in context. + _, r := observability.NewReporter(context.Background(), ReportDecode) + err := obsDecode(in, out) + if err != nil { + r.Error() + } else { + r.OK() + } + return err +} + +func obsDecode(in, out interface{}) error { if in == nil { return nil } @@ -47,7 +64,22 @@ func Decode(in, out interface{}) error { return nil } +// Encode attempts to xml.Marshal `in` into bytes. Encode will inspect `in` +// and returns `in` unmodified if it is detected that `in` is already a []byte; +// Or xml.Marshal errors. func Encode(in interface{}) ([]byte, error) { + // TODO: wire in context. + _, r := observability.NewReporter(context.Background(), ReportEncode) + b, err := obsEncode(in) + if err != nil { + r.Error() + } else { + r.OK() + } + return b, err +} + +func obsEncode(in interface{}) ([]byte, error) { if b, ok := in.([]byte); ok { // check to see if it is a pre-encoded byte string. if len(b) > 0 && b[0] == byte('"') { diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml/doc.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml/doc.go new file mode 100644 index 00000000000..e7e013d3f02 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml/doc.go @@ -0,0 +1,4 @@ +/* +Package datacodec/xml holds the encoder/decoder implementation for `application/xml`. +*/ +package xml diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml/observability.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml/observability.go new file mode 100644 index 00000000000..4948c9be477 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml/observability.go @@ -0,0 +1,54 @@ +package xml + +import ( + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" +) + +var ( + LatencyMs = stats.Float64("datacodec/xml/latency", "The latency in milliseconds for the CloudEvents xml data codec methods.", "ms") +) + +var ( + LatencyView = &view.View{ + Name: "datacodec/xml/latency", + Measure: LatencyMs, + Description: "The distribution of latency inside of the xml data codec for CloudEvents.", + Aggregation: view.Distribution(0, .01, .1, 1, 10, 100, 1000, 10000), + TagKeys: observability.LatencyTags(), + } +) + +type Observed int32 + +const ( + ReportEncode Observed = iota + ReportDecode +) + +func (o Observed) TraceName() string { + switch o { + case ReportEncode: + return "datacodec/xml/encode" + case ReportDecode: + return "datacodec/xml/decode" + default: + return "datacodec/xml/unknown" + } +} + +func (o Observed) MethodName() string { + switch o { + case ReportEncode: + return "encode" + case ReportDecode: + return "decode" + default: + return "unknown" + } +} + +func (o Observed) LatencyMs() *stats.Float64Measure { + return LatencyMs +} diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/doc.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/doc.go index cfb6eac590c..cc2201da915 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/doc.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/doc.go @@ -1,86 +1,4 @@ /* Package cloudevents provides primitives to work with CloudEvents specification: https://github.com/cloudevents/spec. - - -Parsing Event from HTTP Request: - // req is *http.Request - event, err := cloudEvents.FromHTTPRequest(req) - if err != nil { - panic("Unable to parse event from http Request: " + err.String()) - } - - -Creating a minimal CloudEvent in version 0.1: - import "github.com/cloudevents/sdk-go/pkg/cloudevents/v01" - event := v01.Event{ - EventType: "com.example.file.created", - Source: "/providers/Example.COM/storage/account#fileServices/default/{new-file}", - EventID: "ea35b24ede421", - } - - -The goal of this package is to provide support for all released versions of CloudEvents, ideally while maintaining -the same API. It will use semantic versioning with following rules: -* MAJOR version increments when backwards incompatible changes is introduced. -* MINOR version increments when backwards compatible feature is introduced INCLUDING support for new CloudEvents version. -* PATCH version increments when a backwards compatible bug fix is introduced. */ package cloudevents - -/* - -New plan. - -Everything gets converted into the Canonical form of the event, this -then can select a transport, the transport provides encodings. - -At the moment we have cloudevents.[v01, v02] - -Canonical form holds an encoded data packet that takes in a provided Codec - -Canonical form has two members: Context, and Data - - -Sending: -cloudevents.[v01, v02] -> { Codec.Encode -> HttpMessage -> Transport[Http] } - -Receiving: -{ Transport[Http] -> HttpMessage -> Codec.Decode } -> cloudevents.[v01, v02] - -Note: Transport and Codec are grouped. - -Transport Codecs supported: -[Binary, Structured, StructuredMirrorHeaders] - - -## Working with inner data: -cloudevents.[v01, v02].Decode(DataCodec) -> custom data - -Working with inner data, - -Sending: -cloudevents.[v01, v02].data -> DataCodec.Decode -> custom data - -Receiving: -custom data -> DataCodec.Encode -> cloudevents.[v01, v02].data,contentType - -Data Codecs supported: -[json, xml, base64, text] - - -This imples that there is only one canonical form and it evolves and is marked -deprecated as the model evolves. - -Spec says: - -Event[Context, Data] -> Message - -Http Message should have: - -Headers -Body -ContentType -ContentLength - - -*/ diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/event.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/event.go index 5964d612484..4389a5de9b9 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/event.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/event.go @@ -1,8 +1,11 @@ package cloudevents import ( + "bytes" + "encoding/json" "fmt" "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec" + "sort" "strings" ) @@ -43,31 +46,111 @@ func (e Event) Validate() error { } func (e Event) String() string { - sb := strings.Builder{} + b := strings.Builder{} - if s := e.SpecVersion(); s != "" { - if sb.Len() > 0 { - sb.WriteString("\n") - } - sb.WriteString("SpecVersion: ") - sb.WriteString(s) + b.WriteString("Validation: ") + + valid := e.Validate() + if valid == nil { + b.WriteString("valid\n") + } else { + b.WriteString("invalid\n") + } + if valid != nil { + b.WriteString(fmt.Sprintf("Validation Error: \n%s\n", valid.Error())) } - if s := e.Type(); s != "" { - if sb.Len() > 0 { - sb.WriteString("\n") + b.WriteString("Context Attributes,\n") + + var extensions map[string]interface{} + + switch e.SpecVersion() { + case CloudEventsVersionV01: + if ec, ok := e.Context.(EventContextV01); ok { + b.WriteString(" cloudEventsVersion: " + ec.CloudEventsVersion + "\n") + b.WriteString(" eventType: " + ec.EventType + "\n") + if ec.EventTypeVersion != nil { + b.WriteString(" eventTypeVersion: " + *ec.EventTypeVersion + "\n") + } + b.WriteString(" source: " + ec.Source.String() + "\n") + b.WriteString(" eventID: " + ec.EventID + "\n") + if ec.EventTime != nil { + b.WriteString(" eventTime: " + ec.EventTime.String() + "\n") + } + if ec.SchemaURL != nil { + b.WriteString(" schemaURL: " + ec.SchemaURL.String() + "\n") + } + if ec.ContentType != nil { + b.WriteString(" contentType: " + *ec.ContentType + "\n") + } + extensions = ec.Extensions + } + + case CloudEventsVersionV02: + if ec, ok := e.Context.(EventContextV02); ok { + b.WriteString(" specversion: " + ec.SpecVersion + "\n") + b.WriteString(" type: " + ec.Type + "\n") + b.WriteString(" source: " + ec.Source.String() + "\n") + b.WriteString(" id: " + ec.ID + "\n") + if ec.Time != nil { + b.WriteString(" time: " + ec.Time.String() + "\n") + } + if ec.SchemaURL != nil { + b.WriteString(" schemaurl: " + ec.SchemaURL.String() + "\n") + } + if ec.ContentType != nil { + b.WriteString(" contenttype: " + *ec.ContentType + "\n") + } + extensions = ec.Extensions + } + + case CloudEventsVersionV03: + if ec, ok := e.Context.(EventContextV03); ok { + b.WriteString(" specversion: " + ec.SpecVersion + "\n") + b.WriteString(" type: " + ec.Type + "\n") + b.WriteString(" source: " + ec.Source.String() + "\n") + b.WriteString(" id: " + ec.ID + "\n") + if ec.Time != nil { + b.WriteString(" time: " + ec.Time.String() + "\n") + } + if ec.SchemaURL != nil { + b.WriteString(" schemaurl: " + ec.SchemaURL.String() + "\n") + } + if ec.DataContentType != nil { + b.WriteString(" datacontenttype: " + *ec.DataContentType + "\n") + } + extensions = ec.Extensions } - sb.WriteString("Type: ") - sb.WriteString(s) + default: + b.WriteString(e.String() + "\n") } - if s := e.DataContentType(); s != "" { - if sb.Len() > 0 { - sb.WriteString("\n") + if extensions != nil && len(extensions) > 0 { + b.WriteString("Extensions,\n") + keys := make([]string, 0, len(extensions)) + for k := range extensions { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + b.WriteString(fmt.Sprintf(" %s: %v\n", key, extensions[key])) } - sb.WriteString("DataContentType: ") - sb.WriteString(s) } - return sb.String() + if e.Data != nil { + b.WriteString("Data,\n ") + if strings.HasPrefix(e.DataContentType(), "application/json") { + var prettyJSON bytes.Buffer + err := json.Indent(&prettyJSON, e.Data.([]byte), " ", " ") + if err != nil { + b.Write(e.Data.([]byte)) + } else { + b.Write(prettyJSON.Bytes()) + } + } else { + b.Write(e.Data.([]byte)) + } + b.WriteString("\n") + } + return b.String() } diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/observability/keys.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/observability/keys.go new file mode 100644 index 00000000000..d9f961e86c7 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/observability/keys.go @@ -0,0 +1,15 @@ +package observability + +import ( + "go.opencensus.io/tag" +) + +var ( + KeyMethod, _ = tag.NewKey("method") + KeyResult, _ = tag.NewKey("result") +) + +const ( + ResultError = "error" + ResultOK = "success" +) diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/observability/observer.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/observability/observer.go new file mode 100644 index 00000000000..944059b5c37 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/observability/observer.go @@ -0,0 +1,88 @@ +package observability + +import ( + "context" + "sync" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "go.opencensus.io/trace" +) + +// Observable represents the the customization used by the Reporter for a given +// measurement and trace for a single method. +type Observable interface { + TraceName() string + MethodName() string + LatencyMs() *stats.Float64Measure +} + +// Reporter represents a running latency counter and trace span. When Error or +// OK are called, the latency is calculated and the trace space is ended. Error +// or OK are only allowed to be called once. +type Reporter interface { + Error() + OK() +} + +type reporter struct { + ctx context.Context + span *trace.Span + on Observable + start time.Time + measure stats.Measure + once sync.Once +} + +// All tags used for Latency measurements. +func LatencyTags() []tag.Key { + return []tag.Key{KeyMethod, KeyResult} +} + +func NewReporter(ctx context.Context, on Observable) (context.Context, Reporter) { + ctx, span := trace.StartSpan(ctx, on.TraceName()) + r := &reporter{ + ctx: ctx, + on: on, + span: span, + start: time.Now(), + } + r.tagMethod() + return ctx, r +} + +func (r *reporter) tagMethod() { + var err error + r.ctx, err = tag.New(r.ctx, tag.Insert(KeyMethod, r.on.MethodName())) + if err != nil { + panic(err) // or ignore? + } +} + +func (r *reporter) record() { + ms := float64(time.Since(r.start) / time.Millisecond) + stats.Record(r.ctx, r.on.LatencyMs().M(ms)) + r.span.End() +} + +func (r *reporter) Error() { + r.once.Do(func() { + r.result(ResultError) + }) +} + +func (r *reporter) OK() { + r.once.Do(func() { + r.result(ResultOK) + }) +} + +func (r *reporter) result(v string) { + var err error + r.ctx, err = tag.New(r.ctx, tag.Insert(KeyResult, v)) + if err != nil { + panic(err) // or ignore? + } + r.record() +} diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/doc.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/doc.go new file mode 100644 index 00000000000..b93cd60a8cf --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/doc.go @@ -0,0 +1,5 @@ +/* +Package transport is the toplevel package to define interfaces that the client and codec packages use to decouple from +the transport implementations. +*/ +package transport diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v01.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v01.go index 9be36a4b2d4..7a23d707d6e 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v01.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v01.go @@ -1,10 +1,12 @@ package http import ( + "context" "encoding/json" "fmt" "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/codec" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" "github.com/cloudevents/sdk-go/pkg/cloudevents/types" "log" @@ -20,6 +22,18 @@ type CodecV01 struct { var _ transport.Codec = (*CodecV01)(nil) func (v CodecV01) Encode(e cloudevents.Event) (transport.Message, error) { + // TODO: wire context + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportEncode, c: v.Encoding.Codec()}) + m, err := v.obsEncode(e) + if err != nil { + r.Error() + } else { + r.OK() + } + return m, err +} + +func (v CodecV01) obsEncode(e cloudevents.Event) (transport.Message, error) { switch v.Encoding { case Default: fallthrough @@ -33,6 +47,18 @@ func (v CodecV01) Encode(e cloudevents.Event) (transport.Message, error) { } func (v CodecV01) Decode(msg transport.Message) (*cloudevents.Event, error) { + // TODO: wire context + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportDecode, c: v.inspectEncoding(msg).Codec()}) // TODO: inspectEncoding is not free. + e, err := v.obsDecode(msg) + if err != nil { + r.Error() + } else { + r.OK() + } + return e, err +} + +func (v CodecV01) obsDecode(msg transport.Message) (*cloudevents.Event, error) { switch v.inspectEncoding(msg) { case BinaryV01: return v.decodeBinary(msg) diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v02.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v02.go index 4c10ac9868c..5377a973494 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v02.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v02.go @@ -1,10 +1,12 @@ package http import ( + "context" "encoding/json" "fmt" "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/codec" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" "github.com/cloudevents/sdk-go/pkg/cloudevents/types" "log" @@ -20,6 +22,18 @@ type CodecV02 struct { var _ transport.Codec = (*CodecV02)(nil) func (v CodecV02) Encode(e cloudevents.Event) (transport.Message, error) { + // TODO: wire context + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportEncode, c: v.Encoding.Codec()}) + m, err := v.obsEncode(e) + if err != nil { + r.Error() + } else { + r.OK() + } + return m, err +} + +func (v CodecV02) obsEncode(e cloudevents.Event) (transport.Message, error) { switch v.Encoding { case Default: fallthrough @@ -33,6 +47,18 @@ func (v CodecV02) Encode(e cloudevents.Event) (transport.Message, error) { } func (v CodecV02) Decode(msg transport.Message) (*cloudevents.Event, error) { + // TODO: wire context + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportDecode, c: v.inspectEncoding(msg).Codec()}) // TODO: inspectEncoding is not free. + e, err := v.obsDecode(msg) + if err != nil { + r.Error() + } else { + r.OK() + } + return e, err +} + +func (v CodecV02) obsDecode(msg transport.Message) (*cloudevents.Event, error) { switch v.inspectEncoding(msg) { case BinaryV02: return v.decodeBinary(msg) diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v03.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v03.go index 78cfd2011d5..0ceee6aa90f 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v03.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/codec_v03.go @@ -1,10 +1,12 @@ package http import ( + "context" "encoding/json" "fmt" "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/codec" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" "github.com/cloudevents/sdk-go/pkg/cloudevents/types" "log" @@ -20,6 +22,18 @@ type CodecV03 struct { var _ transport.Codec = (*CodecV03)(nil) func (v CodecV03) Encode(e cloudevents.Event) (transport.Message, error) { + // TODO: wire context + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportEncode, c: v.Encoding.Codec()}) + m, err := v.obsEncode(e) + if err != nil { + r.Error() + } else { + r.OK() + } + return m, err +} + +func (v CodecV03) obsEncode(e cloudevents.Event) (transport.Message, error) { switch v.Encoding { case Default: fallthrough @@ -35,6 +49,18 @@ func (v CodecV03) Encode(e cloudevents.Event) (transport.Message, error) { } func (v CodecV03) Decode(msg transport.Message) (*cloudevents.Event, error) { + // TODO: wire context + _, r := observability.NewReporter(context.Background(), CodecObserved{o: ReportDecode, c: v.inspectEncoding(msg).Codec()}) // TODO: inspectEncoding is not free. + e, err := v.obsDecode(msg) + if err != nil { + r.Error() + } else { + r.OK() + } + return e, err +} + +func (v CodecV03) obsDecode(msg transport.Message) (*cloudevents.Event, error) { switch v.inspectEncoding(msg) { case BinaryV03: return v.decodeBinary(msg) diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/context.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/context.go index 70ac8de8ec0..b0a397e210e 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/context.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/context.go @@ -2,6 +2,9 @@ package http import ( "context" + "fmt" + "net/http" + "strings" ) // TransportContext allows a Receiver to understand the context of a request. @@ -9,6 +12,92 @@ type TransportContext struct { URI string Host string Method string + Header http.Header + + // IgnoreHeaderPrefixes controls what comes back from AttendToHeaders. + // AttendToHeaders controls what is output for .String() + IgnoreHeaderPrefixes []string +} + +func NewTransportContext(req *http.Request) TransportContext { + var tx *TransportContext + if req != nil { + tx = &TransportContext{ + URI: req.RequestURI, + Host: req.Host, + Method: req.Method, + Header: req.Header, + } + } else { + tx = &TransportContext{} + } + tx.AddIgnoreHeaderPrefix("accept-encoding", "user-agent", "connection", "content-type") + return *tx +} + +// AttendToHeaders returns the list of headers that exist in the TransportContext that are not currently in +// tx.IgnoreHeaderPrefix. +func (tx TransportContext) AttendToHeaders() []string { + a := []string(nil) + if tx.Header != nil && len(tx.Header) > 0 { + for k := range tx.Header { + if tx.shouldIgnoreHeader(k) { + continue + } + a = append(a, k) + } + } + return a +} + +func (tx TransportContext) shouldIgnoreHeader(h string) bool { + for _, v := range tx.IgnoreHeaderPrefixes { + if strings.HasPrefix(strings.ToLower(h), strings.ToLower(v)) { + return true + } + } + return false +} + +// String generates a pretty-printed version of the resource as a string. +func (tx TransportContext) String() string { + b := strings.Builder{} + + b.WriteString("Transport Context,\n") + + empty := b.Len() + + if tx.URI != "" { + b.WriteString(" URI: " + tx.URI + "\n") + } + if tx.Host != "" { + b.WriteString(" Host: " + tx.Host + "\n") + } + + if tx.Method != "" { + b.WriteString(" Method: " + tx.Method + "\n") + } + + if tx.Header != nil && len(tx.Header) > 0 { + b.WriteString(" Header:\n") + for _, k := range tx.AttendToHeaders() { + b.WriteString(fmt.Sprintf(" %s: %s\n", k, tx.Header.Get(k))) + } + } + + if b.Len() == empty { + b.WriteString(" nil\n") + } + + return b.String() +} + +// AddIgnoreHeaderPrefix controls what header key is to be attended to and/or printed. +func (tx *TransportContext) AddIgnoreHeaderPrefix(prefix ...string) { + if tx.IgnoreHeaderPrefixes == nil { + tx.IgnoreHeaderPrefixes = []string(nil) + } + tx.IgnoreHeaderPrefixes = append(tx.IgnoreHeaderPrefixes, prefix...) } // Opaque key type used to store TransportContext @@ -16,6 +105,7 @@ type transportContextKeyType struct{} var transportContextKey = transportContextKeyType{} +// WithTransportContext return a context with the given TransportContext into the provided context object. func WithTransportContext(ctx context.Context, tcxt TransportContext) context.Context { return context.WithValue(ctx, transportContextKey, tcxt) } @@ -32,3 +122,27 @@ func TransportContextFrom(ctx context.Context) TransportContext { } return TransportContext{} } + +// Opaque key type used to store Headers +type headerKeyType struct{} + +var headerKey = headerKeyType{} + +// ContextWithHeader returns a context with a header added to the given context. +// Can be called multiple times to set multiple header key/value pairs. +func ContextWithHeader(ctx context.Context, key, value string) context.Context { + header := HeaderFrom(ctx) + header.Add(key, value) + return context.WithValue(ctx, headerKey, header) +} + +// HeaderFrom extracts the header oject in the given context. Always returns a non-nil Header. +func HeaderFrom(ctx context.Context) http.Header { + header := ctx.Value(headerKey) + if header != nil { + if h, ok := header.(http.Header); ok { + return h + } + } + return http.Header{} +} diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/doc.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/doc.go new file mode 100644 index 00000000000..7a8df0e88a2 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/doc.go @@ -0,0 +1,4 @@ +/* +Package transport/http implements the CloudEvent transport implementation using HTTP. +*/ +package http diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/encoding.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/encoding.go index e16483f9a15..92017cbf372 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/encoding.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/encoding.go @@ -74,3 +74,34 @@ func (e Encoding) Version() string { return "Unknown" } } + +func (e Encoding) Codec() string { + switch e { + case Default: + return "default" + + // Version 0.1 + case BinaryV01: + return "binary/v0.1" + case StructuredV01: + return "structured/v0.1" + + // Version 0.2 + case BinaryV02: + return "binary/v0.3" + case StructuredV02: + return "structured/v0.2" + + // Version 0.3 + case BinaryV03: + return "binary/v0.3" + case StructuredV03: + return "structured/v0.3" + case BatchedV03: + return "batched/v0.3" + + // Unknown + default: + return "unknown" + } +} diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/observability.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/observability.go new file mode 100644 index 00000000000..b87ca8dd998 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/observability.go @@ -0,0 +1,93 @@ +package http + +import ( + "fmt" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" +) + +var ( + LatencyMs = stats.Float64( + "transport/http/latency", + "The latency in milliseconds for the http transport methods for CloudEvents.", + "ms") +) + +var ( + LatencyView = &view.View{ + Name: "transport/http/latency", + Measure: LatencyMs, + Description: "The distribution of latency inside of http transport for CloudEvents.", + Aggregation: view.Distribution(0, .01, .1, 1, 10, 100, 1000, 10000), + TagKeys: observability.LatencyTags(), + } +) + +type Observed int32 + +const ( + ReportSend Observed = iota + ReportReceive + ReportServeHTTP + ReportEncode + ReportDecode +) + +func (o Observed) TraceName() string { + switch o { + case ReportSend: + return "transport/http/send" + case ReportReceive: + return "transport/http/receive" + case ReportServeHTTP: + return "transport/http/servehttp" + case ReportEncode: + return "transport/http/encode" + case ReportDecode: + return "transport/http/decode" + default: + return "transport/http/unknown" + } +} + +func (o Observed) MethodName() string { + switch o { + case ReportSend: + return "send" + case ReportReceive: + return "receive" + case ReportServeHTTP: + return "servehttp" + case ReportEncode: + return "encode" + case ReportDecode: + return "decode" + default: + return "unknown" + } +} + +func (o Observed) LatencyMs() *stats.Float64Measure { + return LatencyMs +} + +// CodecObserved is a wrapper to append version to Observed. +type CodecObserved struct { + // Method + o Observed + // Codec + c string +} + +func (c CodecObserved) TraceName() string { + return fmt.Sprintf("%s/%s", c.o.TraceName(), c.c) +} + +func (c CodecObserved) MethodName() string { + return fmt.Sprintf("%s/%s", c.o.MethodName(), c.c) +} + +func (c CodecObserved) LatencyMs() *stats.Float64Measure { + return c.o.LatencyMs() +} diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/options.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/options.go index ea54869c746..0f8e70901b2 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/options.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/options.go @@ -5,6 +5,7 @@ import ( nethttp "net/http" "net/url" "strings" + "time" ) type Option func(*Transport) error @@ -56,6 +57,39 @@ func WithMethod(method string) Option { } } +// WithHeader sets an additional default outbound header for all cloudevents +// when using an HTTP request. +func WithHeader(key, value string) Option { + return func(t *Transport) error { + if t == nil { + return fmt.Errorf("http header option can not set nil transport") + } + key = strings.TrimSpace(key) + if key != "" { + if t.Req == nil { + t.Req = &nethttp.Request{} + } + if t.Req.Header == nil { + t.Req.Header = nethttp.Header{} + } + t.Req.Header.Add(key, value) + return nil + } + return fmt.Errorf("http header option was empty string") + } +} + +// WithShutdownTimeout sets the shutdown timeout when the http server is being shutdown. +func WithShutdownTimeout(timeout time.Duration) Option { + return func(t *Transport) error { + if t == nil { + return fmt.Errorf("http shutdown timeout option can not set nil transport") + } + t.ShutdownTimeout = &timeout + return nil + } +} + // WithEncoding sets the encoding for clients with HTTP transports. func WithEncoding(encoding Encoding) Option { return func(t *Transport) error { diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/transport.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/transport.go index 8f0f8d5dcf7..90b1c2ec8ef 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/transport.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http/transport.go @@ -4,12 +4,14 @@ import ( "bytes" "context" "fmt" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" "io/ioutil" "log" "net" "net/http" "strings" "sync" + "time" "github.com/cloudevents/sdk-go/pkg/cloudevents" cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" @@ -21,11 +23,20 @@ type EncodingSelector func(e cloudevents.Event) Encoding // type check that this transport message impl matches the contract var _ transport.Transport = (*Transport)(nil) +const ( + // DefaultShutdownTimeout defines the default timeout given to the http.Server when calling Shutdown. + DefaultShutdownTimeout = time.Minute * 1 +) + // Transport acts as both a http client and a http handler. type Transport struct { Encoding Encoding DefaultEncodingSelectionFn EncodingSelector + // ShutdownTimeout defines the timeout given to the http.Server when calling Shutdown. + // If nil, DefaultShutdownTimeout is used. + ShutdownTimeout *time.Duration + // Sending Client *http.Client Req *http.Request @@ -82,17 +93,42 @@ func (t *Transport) loadCodec() bool { return true } +func copyHeaders(from, to http.Header) { + if from == nil || to == nil { + return + } + for header, values := range from { + for _, value := range values { + to.Add(header, value) + } + } +} + func (t *Transport) Send(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, error) { + ctx, r := observability.NewReporter(ctx, ReportSend) + resp, err := t.obsSend(ctx, event) + if err != nil { + r.Error() + } else { + r.OK() + } + return resp, err +} + +func (t *Transport) obsSend(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, error) { if t.Client == nil { t.crMu.Lock() t.Client = &http.Client{} t.crMu.Unlock() } - var req http.Request + req := http.Request{ + Header: HeaderFrom(ctx), + } if t.Req != nil { req.Method = t.Req.Method req.URL = t.Req.URL + copyHeaders(t.Req.Header, req.Header) } // Override the default request with target from context. @@ -109,11 +145,11 @@ func (t *Transport) Send(ctx context.Context, event cloudevents.Event) (*cloudev return nil, err } - // TODO: merge the incoming request with msg, for now just replace. if m, ok := msg.(*Message); ok { - req.Header = m.Header + copyHeaders(m.Header, req.Header) req.Body = ioutil.NopCloser(bytes.NewBuffer(m.Body)) req.ContentLength = int64(len(m.Body)) + req.Close = true return httpDo(ctx, &req, func(resp *http.Response, err error) (*cloudevents.Event, error) { if err != nil { return nil, err @@ -191,7 +227,13 @@ func (t *Transport) StartReceiver(ctx context.Context) error { select { case <-ctx.Done(): // Try a gracefully shutdown. - return t.server.Shutdown(context.Background()) + timeout := DefaultShutdownTimeout + if t.ShutdownTimeout != nil { + timeout = *t.ShutdownTimeout + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return t.server.Shutdown(ctx) case err := <-errChan: return err } @@ -238,6 +280,17 @@ func status(resp *http.Response) string { } func (t *Transport) invokeReceiver(ctx context.Context, event cloudevents.Event) (*Response, error) { + ctx, r := observability.NewReporter(ctx, ReportReceive) + resp, err := t.obsInvokeReceiver(ctx, event) + if err != nil { + r.Error() + } else { + r.OK() + } + return resp, err +} + +func (t *Transport) obsInvokeReceiver(ctx context.Context, event cloudevents.Event) (*Response, error) { if t.Receiver != nil { // Note: http does not use eventResp.Reason eventResp := cloudevents.EventResponse{} @@ -277,11 +330,14 @@ func (t *Transport) invokeReceiver(ctx context.Context, event cloudevents.Event) // ServeHTTP implements http.Handler func (t *Transport) ServeHTTP(w http.ResponseWriter, req *http.Request) { + ctx, r := observability.NewReporter(req.Context(), ReportServeHTTP) + body, err := ioutil.ReadAll(req.Body) if err != nil { log.Printf("failed to handle request: %s %v", err, req) w.WriteHeader(http.StatusBadRequest) w.Write([]byte(`{"error":"Invalid request"}`)) + r.Error() return } msg := &Message{ @@ -294,6 +350,7 @@ func (t *Transport) ServeHTTP(w http.ResponseWriter, req *http.Request) { log.Printf("failed to load codec: %s", err) w.WriteHeader(http.StatusBadRequest) w.Write([]byte(fmt.Sprintf(`{"error":%q}`, err.Error()))) + r.Error() return } event, err := t.codec.Decode(msg) @@ -301,23 +358,19 @@ func (t *Transport) ServeHTTP(w http.ResponseWriter, req *http.Request) { log.Printf("failed to decode message: %s %v", err, req) w.WriteHeader(http.StatusBadRequest) w.Write([]byte(fmt.Sprintf(`{"error":%q}`, err.Error()))) + r.Error() return } - ctx := req.Context() - if req != nil { - ctx = WithTransportContext(ctx, TransportContext{ - URI: req.RequestURI, - Host: req.Host, - Method: req.Method, - }) + ctx = WithTransportContext(ctx, NewTransportContext(req)) } resp, err := t.invokeReceiver(ctx, *event) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(fmt.Sprintf(`{"error":%q}`, err.Error()))) + r.Error() return } if resp != nil { @@ -328,16 +381,24 @@ func (t *Transport) ServeHTTP(w http.ResponseWriter, req *http.Request) { } } } + status := http.StatusAccepted if resp.StatusCode >= 200 && resp.StatusCode < 600 { - w.WriteHeader(resp.StatusCode) + status = resp.StatusCode } + w.WriteHeader(status) if len(resp.Body) > 0 { - w.Write(resp.Body) + if _, err := w.Write(resp.Body); err != nil { + r.Error() + return + } } + + r.OK() return } w.WriteHeader(http.StatusNoContent) + r.OK() } func (t *Transport) GetPort() int { diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/types/doc.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/types/doc.go new file mode 100644 index 00000000000..1019b4a2dd2 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/types/doc.go @@ -0,0 +1,4 @@ +/* +Package types provides custom types to support CloudEvents. +*/ +package types diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/types/timestamp.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/types/timestamp.go index c87e24804d3..c11e4e15a6d 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/types/timestamp.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/types/timestamp.go @@ -7,10 +7,15 @@ import ( "time" ) +// Timestamp wraps time.Time to normalize the time layout to RFC3339. It is +// intended to enforce compliance with the CloudEvents spec for their +// definition of Timestamp. Custom marshal methods are implemented to ensure +// the outbound Timestamp is a string in the RFC3339 layout. type Timestamp struct { time.Time } +// ParseTimestamp attempts to parse the given time assuming RFC3339 layout func ParseTimestamp(t string) *Timestamp { if t == "" { return nil @@ -22,7 +27,8 @@ func ParseTimestamp(t string) *Timestamp { return &Timestamp{Time: timestamp} } -// This allows json marshaling to always be in RFC3339Nano format. +// MarshalJSON implements a custom json marshal method used when this type is +// marshaled using json.Marshal. func (t *Timestamp) MarshalJSON() ([]byte, error) { if t == nil || t.IsZero() { return []byte(`""`), nil @@ -31,6 +37,8 @@ func (t *Timestamp) MarshalJSON() ([]byte, error) { return []byte(rfc3339), nil } +// UnmarshalJSON implements the json unmarshal method used when this type is +// unmarshed using json.Unmarshal. func (t *Timestamp) UnmarshalJSON(b []byte) error { var timestamp string if err := json.Unmarshal(b, ×tamp); err != nil { @@ -42,6 +50,8 @@ func (t *Timestamp) UnmarshalJSON(b []byte) error { return nil } +// MarshalXML implements a custom xml marshal method used when this type is +// marshaled using xml.Marshal. func (t *Timestamp) MarshalXML(e *xml.Encoder, start xml.StartElement) error { if t == nil || t.IsZero() { return e.EncodeElement(nil, start) @@ -50,6 +60,8 @@ func (t *Timestamp) MarshalXML(e *xml.Encoder, start xml.StartElement) error { return e.EncodeElement(v, start) } +// UnmarshalXML implements the xml unmarshal method used when this type is +// unmarshed using xml.Unmarshal. func (t *Timestamp) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { var timestamp string if err := d.DecodeElement(×tamp, &start); err != nil { @@ -61,6 +73,7 @@ func (t *Timestamp) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { return nil } +// String outputs the time using layout RFC3339. func (t *Timestamp) String() string { if t == nil { return time.Time{}.UTC().Format(time.RFC3339Nano) diff --git a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/types/urlref.go b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/types/urlref.go index f79fafa0860..d1cc2703645 100644 --- a/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/types/urlref.go +++ b/vendor/github.com/cloudevents/sdk-go/pkg/cloudevents/types/urlref.go @@ -7,10 +7,15 @@ import ( "net/url" ) +// URLRef is a wrapper to url.URL. It is intended to enforce compliance with +// the CloudEvents spec for their definition of URI-Reference. Custom +// marshal methods are implemented to ensure the outbound URLRef object is +// is a flat string. type URLRef struct { url.URL } +// ParseURLRef attempts to parse the given string as a URI-Reference. func ParseURLRef(u string) *URLRef { if u == "" { return nil @@ -22,12 +27,15 @@ func ParseURLRef(u string) *URLRef { return &URLRef{URL: *pu} } -// This allows json marshaling to always be in RFC3339Nano format. +// MarshalJSON implements a custom json marshal method used when this type is +// marshaled using json.Marshal. func (u URLRef) MarshalJSON() ([]byte, error) { b := fmt.Sprintf("%q", u.String()) return []byte(b), nil } +// UnmarshalJSON implements the json unmarshal method used when this type is +// unmarshed using json.Unmarshal. func (u *URLRef) UnmarshalJSON(b []byte) error { var ref string if err := json.Unmarshal(b, &ref); err != nil { @@ -40,11 +48,15 @@ func (u *URLRef) UnmarshalJSON(b []byte) error { return nil } +// MarshalXML implements a custom xml marshal method used when this type is +// marshaled using xml.Marshal. func (u URLRef) MarshalXML(e *xml.Encoder, start xml.StartElement) error { v := fmt.Sprintf("%s", u.String()) return e.EncodeElement(v, start) } +// UnmarshalXML implements the xml unmarshal method used when this type is +// unmarshed using xml.Unmarshal. func (u *URLRef) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { var ref string if err := d.DecodeElement(&ref, &start); err != nil { @@ -57,6 +69,7 @@ func (u *URLRef) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { return nil } +// String returns the full string representation of the URI-Reference. func (u *URLRef) String() string { if u == nil { return ""