diff --git a/go.mod b/go.mod index 09b5ed88278..5367c9218b4 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.14 require ( contrib.go.opencensus.io/exporter/stackdriver v0.13.1 // indirect - github.com/cloudevents/sdk-go/v2 v2.1.0 + github.com/cloudevents/sdk-go/v2 v2.0.1-0.20200630063327-b91da81265fe github.com/ghodss/yaml v1.0.0 github.com/golang/protobuf v1.3.5 github.com/google/go-cmp v0.4.0 diff --git a/go.sum b/go.sum index d35d261ae18..3784ca59588 100644 --- a/go.sum +++ b/go.sum @@ -205,8 +205,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cloudevents/sdk-go v0.0.0-20190509003705-56931988abe3/go.mod h1:j1nZWMLGg3om8SswStBoY6/SHvcLM19MuZqwDtMtmzs= github.com/cloudevents/sdk-go v1.0.0 h1:gS5I0s2qPmdc4GBPlUmzZU7RH30BaiOdcRJ1RkXnPrc= github.com/cloudevents/sdk-go v1.0.0/go.mod h1:3TkmM0cFqkhCHOq5JzzRU/RxRkwzoS8TZ+G448qVTog= -github.com/cloudevents/sdk-go/v2 v2.1.0 h1:bmgrU8k+K2ppZ+G/q5xEQx/Xk9HRtJmkrEO3qtDO2k0= -github.com/cloudevents/sdk-go/v2 v2.1.0/go.mod h1:3CTrpB4+u7Iaj6fd7E2Xvm5IxMdRoaAhqaRVnOr2rCU= +github.com/cloudevents/sdk-go/v2 v2.0.1-0.20200630063327-b91da81265fe h1:EY9DO05JZ+rMTUjJ7eYqjr+n2ZpLqE4UBeSeL2OH+v8= +github.com/cloudevents/sdk-go/v2 v2.0.1-0.20200630063327-b91da81265fe/go.mod h1:3CTrpB4+u7Iaj6fd7E2Xvm5IxMdRoaAhqaRVnOr2rCU= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/containerd/cgroups v0.0.0-20190919134610-bf292b21730f/go.mod h1:OApqhQ4XNSNC13gXIwDjhOQxjWa/NxkwZXJ1EvqT0ko= diff --git a/test/conformance/helpers/broker_tracing_test_helper.go b/test/conformance/helpers/broker_tracing_test_helper.go index b9a4d8f8e36..597a2c6884b 100644 --- a/test/conformance/helpers/broker_tracing_test_helper.go +++ b/test/conformance/helpers/broker_tracing_test_helper.go @@ -30,7 +30,7 @@ import ( tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" - "knative.dev/eventing/test/lib/resources/sender" + "knative.dev/eventing/test/lib/sender" ) // BrokerTracingTestHelperWithChannelTestRunner runs the Broker tracing tests for all Channels in diff --git a/test/conformance/helpers/channel_header_single_event_helper.go b/test/conformance/helpers/channel_header_single_event_helper.go index 025388c2a55..d831df4d66b 100644 --- a/test/conformance/helpers/channel_header_single_event_helper.go +++ b/test/conformance/helpers/channel_header_single_event_helper.go @@ -28,7 +28,7 @@ import ( testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" - "knative.dev/eventing/test/lib/resources/sender" + "knative.dev/eventing/test/lib/sender" ) /* diff --git a/test/conformance/helpers/channel_message_modes_specversion_helper.go b/test/conformance/helpers/channel_message_modes_specversion_helper.go index 4161a58f36d..9bea1add21d 100644 --- a/test/conformance/helpers/channel_message_modes_specversion_helper.go +++ b/test/conformance/helpers/channel_message_modes_specversion_helper.go @@ -17,6 +17,7 @@ limitations under the License. package helpers import ( + "net/http" "strings" "testing" @@ -28,7 +29,7 @@ import ( testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" - "knative.dev/eventing/test/lib/resources/sender" + "knative.dev/eventing/test/lib/sender" ) // ChannelMessageModesAndSpecVersionsTestRunner tests the support of the channel ingress for different spec versions and message modes @@ -97,6 +98,7 @@ func messageModeSpecVersionTest(t *testing.T, channel metav1.TypeMeta, event clo &channel, event, sender.WithEncoding(encoding), + sender.WithResponseSink("http://"+client.GetServiceHost(subscriberName)), ) matchers := []EventMatcher{HasExactlyAttributesEqualTo(event.Context)} @@ -125,4 +127,9 @@ func messageModeSpecVersionTest(t *testing.T, channel metav1.TypeMeta, event clo recordevents.NoError(), recordevents.MatchEvent(matchers...), ) + + eventTracker.AssertExact( + 1, + recordevents.MatchEvent(sender.MatchStatusCode(http.StatusAccepted)), + ) } diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 190ca8bf4ed..2447971b1cb 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -29,7 +29,7 @@ import ( tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" - "knative.dev/eventing/test/lib/resources/sender" + "knative.dev/eventing/test/lib/sender" ) // ChannelTracingTestHelperWithChannelTestRunner runs the Channel tracing tests for all Channels in diff --git a/test/e2e/helpers/channel_single_event_helper.go b/test/e2e/helpers/channel_single_event_helper.go index c438b9c3253..bdd81e27323 100644 --- a/test/e2e/helpers/channel_single_event_helper.go +++ b/test/e2e/helpers/channel_single_event_helper.go @@ -25,10 +25,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" cloudevents "github.com/cloudevents/sdk-go/v2" + testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" - "knative.dev/eventing/test/lib/resources/sender" + "knative.dev/eventing/test/lib/sender" ) type SubscriptionVersion string diff --git a/test/lib/creation.go b/test/lib/creation.go index ea757169b90..ecd05e33346 100644 --- a/test/lib/creation.go +++ b/test/lib/creation.go @@ -346,6 +346,11 @@ func (c *Client) CreatePodOrFail(pod *corev1.Pod, options ...func(*corev1.Pod, * c.podsCreated = append(c.podsCreated, pod.Name) } +// GetServiceHost returns the service hostname for the specified podName +func (c *Client) GetServiceHost(podName string) string { + return fmt.Sprintf("%s.%s.svc", podName, c.Namespace) +} + // CreateDeploymentOrFail will create a Deployment or fail the test if there is an error. func (c *Client) CreateDeploymentOrFail(deploy *appsv1.Deployment, options ...func(*appsv1.Deployment, *Client) error) { // set namespace for the deploy in case it's empty diff --git a/test/lib/recordevents/event_info_store.go b/test/lib/recordevents/event_info_store.go index 20f082a7dbb..18cfc48d095 100644 --- a/test/lib/recordevents/event_info_store.go +++ b/test/lib/recordevents/event_info_store.go @@ -18,6 +18,7 @@ package recordevents import ( "fmt" + "strconv" "strings" "sync" "testing" @@ -307,7 +308,8 @@ func (ei *EventInfoStore) waitAtLeastNMatch(f EventInfoMatcher, min int) ([]Even func formatErrors(errs []error) string { var sb strings.Builder - for _, err := range errs { + for i, err := range errs { + sb.WriteString(strconv.Itoa(i) + " - ") sb.WriteString(err.Error()) sb.WriteRune('\n') } diff --git a/test/lib/send_event.go b/test/lib/send_event.go index e8e7d66f65d..171cfce5693 100644 --- a/test/lib/send_event.go +++ b/test/lib/send_event.go @@ -23,7 +23,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" - "knative.dev/eventing/test/lib/resources/sender" + "knative.dev/eventing/test/lib/sender" ) // SendEventToAddressable will send the given event to the given Addressable. diff --git a/test/lib/sender/events.go b/test/lib/sender/events.go new file mode 100644 index 00000000000..f01b53e709f --- /dev/null +++ b/test/lib/sender/events.go @@ -0,0 +1,48 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sender + +import ( + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +const ( + EventType = "sender.test.knative.dev" + ResponseStatusCodeExtension = "responsestatuscode" +) + +// NewSenderEvent creates a new sender event assertable with the matchers provided in this package +func NewSenderEvent(id string, source string, event *cloudevents.Event, result *cehttp.Result) cloudevents.Event { + ev := cloudevents.NewEvent() + ev.SetID(id) + ev.SetSource(source) + ev.SetType(EventType) + ev.SetTime(time.Now()) + + if result != nil { + ev.SetExtension(ResponseStatusCodeExtension, result.StatusCode) + } + + if event != nil { + _ = ev.SetData("application/json", event) + } + + return ev +} diff --git a/test/lib/sender/matchers.go b/test/lib/sender/matchers.go new file mode 100644 index 00000000000..8f76384554f --- /dev/null +++ b/test/lib/sender/matchers.go @@ -0,0 +1,56 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sender + +import ( + "errors" + "fmt" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/event" + cetest "github.com/cloudevents/sdk-go/v2/test" + cetypes "github.com/cloudevents/sdk-go/v2/types" +) + +// MatchStatusCode matches the response status code of the sent event +func MatchStatusCode(status int) cetest.EventMatcher { + return cetest.AllOf( + cetest.HasType(EventType), + cetest.AnyOf( + // Because extensions could lose type information during serialization + // (eg when they're transported as http headers) the assert should match or the string or the int + cetest.HasExtension(ResponseStatusCodeExtension, cetypes.FormatInteger(int32(status))), + cetest.HasExtension(ResponseStatusCodeExtension, status), + ), + ) +} + +// MatchInnerEvent matches the response event of the sent event +func MatchInnerEvent(matchers ...cetest.EventMatcher) cetest.EventMatcher { + return cetest.AllOf(cetest.HasType(EventType), func(have event.Event) error { + if have.Data() != nil { + innerEvent := cloudevents.Event{} + err := have.DataAs(&innerEvent) + if err != nil { + return fmt.Errorf("error while trying to parse inner event %w", err) + } + + return cetest.AllOf(matchers...)(innerEvent) + } + return errors.New("event doesn't contain an inner event") + }) +} diff --git a/test/lib/resources/sender/sender.go b/test/lib/sender/resources.go similarity index 90% rename from test/lib/resources/sender/sender.go rename to test/lib/sender/resources.go index 084fe886723..a13a993d333 100644 --- a/test/lib/resources/sender/sender.go +++ b/test/lib/sender/resources.go @@ -61,6 +61,17 @@ func WithEncoding(encoding cloudevents.Encoding) EventSenderOption { } } +// WithResponseSink sends the response information as CloudEvent to another sink +func WithResponseSink(responseSink string) EventSenderOption { + return func(pod *corev1.Pod) { + pod.Spec.Containers[0].Args = append( + pod.Spec.Containers[0].Args, + "-response-sink", + responseSink, + ) + } +} + // WithEncoding forces the encoding of the event to send from the sender pod func WithAdditionalHeaders(headers map[string]string) EventSenderOption { var kv []string diff --git a/test/test_images/event-sender/main.go b/test/test_images/event-sender/main.go index 1c467e2f448..03a9c0ddf8f 100644 --- a/test/test_images/event-sender/main.go +++ b/test/test_images/event-sender/main.go @@ -34,10 +34,12 @@ import ( "knative.dev/pkg/tracing/propagation/tracecontextb3" "knative.dev/eventing/pkg/tracing" + "knative.dev/eventing/test/lib/sender" ) var ( sink string + responseSink string inputEvent string eventEncoding string periodStr string @@ -51,6 +53,7 @@ var ( func init() { flag.StringVar(&sink, "sink", "", "The sink url for the message destination.") + flag.StringVar(&responseSink, "response-sink", "", "The response sink url to send the response.") flag.StringVar(&inputEvent, "event", "", "Event JSON encoded") flag.StringVar(&eventEncoding, "event-encoding", "binary", "The encoding of the cloud event: [binary, structured].") flag.StringVar(&periodStr, "period", "5", "The number of seconds between messages.") @@ -82,15 +85,6 @@ func main() { maxMsg = m } - defer func() { - var err error - r := recover() - if r != nil { - err = r.(error) - log.Printf("recovered from panic: %v", err) - } - }() - if delay > 0 { log.Printf("will sleep for %s", delay) time.Sleep(delay) @@ -170,10 +164,35 @@ func main() { event.SetID(fmt.Sprintf("%d", sequence)) } - if responseEvent, result := c.Request(ctx, event); !cloudevents.IsACK(result) { - log.Printf("send returned an error: %v\n", result) - } else if responseEvent != nil { - log.Printf("Got response from %s\n%s\n", sink, *responseEvent) + log.Printf("I'm going to send\n%s\n", event) + + responseEvent, responseResult := c.Request(ctx, event) + if cloudevents.IsUndelivered(responseResult) { + log.Printf("send returned an error: %v\n", responseResult) + } else { + if responseEvent != nil { + log.Printf("Got response from %s\n%s\n%s\n", sink, responseResult, *responseEvent) + } else { + log.Printf("Got response from %s\n%s\n", sink, responseResult) + } + + if responseSink != "" { + var httpResult *cehttp.Result + cloudevents.ResultAs(responseResult, &httpResult) + responseEvent := sender.NewSenderEvent( + event.ID(), + "https://knative.dev/eventing/test/event-sender", + responseEvent, + httpResult, + ) + + result2 := c.Send(cloudevents.ContextWithTarget(context.Background(), responseSink), responseEvent) + if cloudevents.IsUndelivered(result2) { + log.Printf("send to response sink returned an error: %v\n", result2) + } else { + log.Printf("Got response from %s\n%s\n", responseSink, result2) + } + } } // Wait for next tick diff --git a/test/test_images/transformevents/main.go b/test/test_images/transformevents/main.go index 502e995d68d..98aa623f40b 100644 --- a/test/test_images/transformevents/main.go +++ b/test/test_images/transformevents/main.go @@ -25,6 +25,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "go.uber.org/zap" + "knative.dev/eventing/pkg/tracing" ) @@ -82,8 +83,6 @@ func main() { } c, err := cloudevents.NewClientObserved(t, - cloudevents.WithTimeNow(), - cloudevents.WithUUIDs(), cloudevents.WithTracePropagation, ) if err != nil { diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/client.go b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go index a1b4e6a524c..61bbd90d05d 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/client.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go @@ -159,7 +159,7 @@ func (c *ceClient) Request(ctx context.Context, e event.Event) (*event.Event, pr // If the protocol returns no error, it is an ACK on the request, but we had // issues turning the response into an event, so make an ACK Result and pass // down the ToEvent error as well. - err = fmt.Errorf("%w; failed to convert response into event: %s", protocol.ResultACK, rserr) + err = protocol.NewReceipt(true, "failed to convert response into event: %s\n%w", rserr.Error(), err) } else { resp = rs } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/test/event_mocks.go b/vendor/github.com/cloudevents/sdk-go/v2/test/event_mocks.go index 7116662f41b..05e5675b13a 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/test/event_mocks.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/test/event_mocks.go @@ -6,6 +6,7 @@ import ( "net/url" "time" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/spec" "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/types" @@ -56,6 +57,18 @@ func MinEvent() event.Event { } } +// FullMessage returns the same event of FullEvent but wrapped as Message. +func FullMessage() binding.Message { + ev := FullEvent() + return binding.ToMessage(&ev) +} + +// MinMessage returns the same event of MinEvent but wrapped as Message. +func MinMessage() binding.Message { + ev := MinEvent() + return binding.ToMessage(&ev) +} + // AllVersions returns all versions of each event in events. // ID gets a -number suffix so IDs are unique. func AllVersions(events []event.Event) []event.Event { diff --git a/vendor/modules.txt b/vendor/modules.txt index 5812b2a2bff..3f3f4036973 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -76,7 +76,7 @@ github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1 github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1 github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1 github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1 -# github.com/cloudevents/sdk-go/v2 v2.1.0 +# github.com/cloudevents/sdk-go/v2 v2.0.1-0.20200630063327-b91da81265fe ## explicit github.com/cloudevents/sdk-go/v2 github.com/cloudevents/sdk-go/v2/binding