Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.14

require (
contrib.go.opencensus.io/exporter/stackdriver v0.13.1 // indirect
github.com/cloudevents/sdk-go/v2 v2.1.0
github.com/cloudevents/sdk-go/v2 v2.0.1-0.20200630063327-b91da81265fe
github.com/ghodss/yaml v1.0.0
github.com/golang/protobuf v1.3.5
github.com/google/go-cmp v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cloudevents/sdk-go v0.0.0-20190509003705-56931988abe3/go.mod h1:j1nZWMLGg3om8SswStBoY6/SHvcLM19MuZqwDtMtmzs=
github.com/cloudevents/sdk-go v1.0.0 h1:gS5I0s2qPmdc4GBPlUmzZU7RH30BaiOdcRJ1RkXnPrc=
github.com/cloudevents/sdk-go v1.0.0/go.mod h1:3TkmM0cFqkhCHOq5JzzRU/RxRkwzoS8TZ+G448qVTog=
github.com/cloudevents/sdk-go/v2 v2.1.0 h1:bmgrU8k+K2ppZ+G/q5xEQx/Xk9HRtJmkrEO3qtDO2k0=
github.com/cloudevents/sdk-go/v2 v2.1.0/go.mod h1:3CTrpB4+u7Iaj6fd7E2Xvm5IxMdRoaAhqaRVnOr2rCU=
github.com/cloudevents/sdk-go/v2 v2.0.1-0.20200630063327-b91da81265fe h1:EY9DO05JZ+rMTUjJ7eYqjr+n2ZpLqE4UBeSeL2OH+v8=
github.com/cloudevents/sdk-go/v2 v2.0.1-0.20200630063327-b91da81265fe/go.mod h1:3CTrpB4+u7Iaj6fd7E2Xvm5IxMdRoaAhqaRVnOr2rCU=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/containerd/cgroups v0.0.0-20190919134610-bf292b21730f/go.mod h1:OApqhQ4XNSNC13gXIwDjhOQxjWa/NxkwZXJ1EvqT0ko=
Expand Down
2 changes: 1 addition & 1 deletion test/conformance/helpers/broker_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/resources/sender"
"knative.dev/eventing/test/lib/sender"
)

// BrokerTracingTestHelperWithChannelTestRunner runs the Broker tracing tests for all Channels in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/resources/sender"
"knative.dev/eventing/test/lib/sender"
)

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package helpers

import (
"net/http"
"strings"
"testing"

Expand All @@ -28,7 +29,7 @@ import (
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/resources/sender"
"knative.dev/eventing/test/lib/sender"
)

// ChannelMessageModesAndSpecVersionsTestRunner tests the support of the channel ingress for different spec versions and message modes
Expand Down Expand Up @@ -97,6 +98,7 @@ func messageModeSpecVersionTest(t *testing.T, channel metav1.TypeMeta, event clo
&channel,
event,
sender.WithEncoding(encoding),
sender.WithResponseSink("http://"+client.GetServiceHost(subscriberName)),
)

matchers := []EventMatcher{HasExactlyAttributesEqualTo(event.Context)}
Expand Down Expand Up @@ -125,4 +127,9 @@ func messageModeSpecVersionTest(t *testing.T, channel metav1.TypeMeta, event clo
recordevents.NoError(),
recordevents.MatchEvent(matchers...),
)

eventTracker.AssertExact(
1,
recordevents.MatchEvent(sender.MatchStatusCode(http.StatusAccepted)),
)
}
2 changes: 1 addition & 1 deletion test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/resources/sender"
"knative.dev/eventing/test/lib/sender"
)

// ChannelTracingTestHelperWithChannelTestRunner runs the Channel tracing tests for all Channels in
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/helpers/channel_single_event_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

cloudevents "github.com/cloudevents/sdk-go/v2"

testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/resources/sender"
"knative.dev/eventing/test/lib/sender"
)

type SubscriptionVersion string
Expand Down
5 changes: 5 additions & 0 deletions test/lib/creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ func (c *Client) CreatePodOrFail(pod *corev1.Pod, options ...func(*corev1.Pod, *
c.podsCreated = append(c.podsCreated, pod.Name)
}

// GetServiceHost returns the service hostname for the specified podName
func (c *Client) GetServiceHost(podName string) string {
return fmt.Sprintf("%s.%s.svc", podName, c.Namespace)
}

// CreateDeploymentOrFail will create a Deployment or fail the test if there is an error.
func (c *Client) CreateDeploymentOrFail(deploy *appsv1.Deployment, options ...func(*appsv1.Deployment, *Client) error) {
// set namespace for the deploy in case it's empty
Expand Down
4 changes: 3 additions & 1 deletion test/lib/recordevents/event_info_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package recordevents

import (
"fmt"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -307,7 +308,8 @@ func (ei *EventInfoStore) waitAtLeastNMatch(f EventInfoMatcher, min int) ([]Even

func formatErrors(errs []error) string {
var sb strings.Builder
for _, err := range errs {
for i, err := range errs {
sb.WriteString(strconv.Itoa(i) + " - ")
sb.WriteString(err.Error())
sb.WriteRune('\n')
}
Expand Down
2 changes: 1 addition & 1 deletion test/lib/send_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"

"knative.dev/eventing/test/lib/resources/sender"
"knative.dev/eventing/test/lib/sender"
)

// SendEventToAddressable will send the given event to the given Addressable.
Expand Down
48 changes: 48 additions & 0 deletions test/lib/sender/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sender

import (
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)

const (
EventType = "sender.test.knative.dev"
ResponseStatusCodeExtension = "responsestatuscode"
)

// NewSenderEvent creates a new sender event assertable with the matchers provided in this package
func NewSenderEvent(id string, source string, event *cloudevents.Event, result *cehttp.Result) cloudevents.Event {
Comment thread
slinkydeveloper marked this conversation as resolved.
ev := cloudevents.NewEvent()
ev.SetID(id)
ev.SetSource(source)
ev.SetType(EventType)
ev.SetTime(time.Now())

if result != nil {
ev.SetExtension(ResponseStatusCodeExtension, result.StatusCode)
}

if event != nil {
_ = ev.SetData("application/json", event)
}

return ev
}
56 changes: 56 additions & 0 deletions test/lib/sender/matchers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sender

import (
"errors"
"fmt"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
cetest "github.com/cloudevents/sdk-go/v2/test"
cetypes "github.com/cloudevents/sdk-go/v2/types"
)

// MatchStatusCode matches the response status code of the sent event
func MatchStatusCode(status int) cetest.EventMatcher {
Comment thread
slinkydeveloper marked this conversation as resolved.
return cetest.AllOf(
cetest.HasType(EventType),
cetest.AnyOf(
// Because extensions could lose type information during serialization
// (eg when they're transported as http headers) the assert should match or the string or the int
cetest.HasExtension(ResponseStatusCodeExtension, cetypes.FormatInteger(int32(status))),
cetest.HasExtension(ResponseStatusCodeExtension, status),
),
)
}

// MatchInnerEvent matches the response event of the sent event
func MatchInnerEvent(matchers ...cetest.EventMatcher) cetest.EventMatcher {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function used anywhere?

Copy link
Copy Markdown
Contributor Author

@slinkydeveloper slinkydeveloper Jun 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, but i suppose we'll need it for broker data-plane tests and other tests when sender actually expects an event back

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lberk might want to take a look too ?

return cetest.AllOf(cetest.HasType(EventType), func(have event.Event) error {
if have.Data() != nil {
innerEvent := cloudevents.Event{}
err := have.DataAs(&innerEvent)
if err != nil {
return fmt.Errorf("error while trying to parse inner event %w", err)
}

return cetest.AllOf(matchers...)(innerEvent)
}
return errors.New("event doesn't contain an inner event")
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ func WithEncoding(encoding cloudevents.Encoding) EventSenderOption {
}
}

// WithResponseSink sends the response information as CloudEvent to another sink
func WithResponseSink(responseSink string) EventSenderOption {
return func(pod *corev1.Pod) {
pod.Spec.Containers[0].Args = append(
pod.Spec.Containers[0].Args,
"-response-sink",
responseSink,
)
}
}

// WithEncoding forces the encoding of the event to send from the sender pod
func WithAdditionalHeaders(headers map[string]string) EventSenderOption {
var kv []string
Expand Down
45 changes: 32 additions & 13 deletions test/test_images/event-sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ import (
"knative.dev/pkg/tracing/propagation/tracecontextb3"

"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/test/lib/sender"
)

var (
sink string
responseSink string
inputEvent string
eventEncoding string
periodStr string
Expand All @@ -51,6 +53,7 @@ var (

func init() {
flag.StringVar(&sink, "sink", "", "The sink url for the message destination.")
flag.StringVar(&responseSink, "response-sink", "", "The response sink url to send the response.")
flag.StringVar(&inputEvent, "event", "", "Event JSON encoded")
flag.StringVar(&eventEncoding, "event-encoding", "binary", "The encoding of the cloud event: [binary, structured].")
flag.StringVar(&periodStr, "period", "5", "The number of seconds between messages.")
Expand Down Expand Up @@ -82,15 +85,6 @@ func main() {
maxMsg = m
}

defer func() {
var err error
r := recover()
if r != nil {
err = r.(error)
log.Printf("recovered from panic: %v", err)
}
}()

if delay > 0 {
log.Printf("will sleep for %s", delay)
time.Sleep(delay)
Expand Down Expand Up @@ -170,10 +164,35 @@ func main() {
event.SetID(fmt.Sprintf("%d", sequence))
}

if responseEvent, result := c.Request(ctx, event); !cloudevents.IsACK(result) {
log.Printf("send returned an error: %v\n", result)
} else if responseEvent != nil {
log.Printf("Got response from %s\n%s\n", sink, *responseEvent)
log.Printf("I'm going to send\n%s\n", event)

responseEvent, responseResult := c.Request(ctx, event)
if cloudevents.IsUndelivered(responseResult) {
log.Printf("send returned an error: %v\n", responseResult)
} else {
if responseEvent != nil {
log.Printf("Got response from %s\n%s\n%s\n", sink, responseResult, *responseEvent)
} else {
log.Printf("Got response from %s\n%s\n", sink, responseResult)
}

if responseSink != "" {
var httpResult *cehttp.Result
cloudevents.ResultAs(responseResult, &httpResult)
responseEvent := sender.NewSenderEvent(
event.ID(),
"https://knative.dev/eventing/test/event-sender",
responseEvent,
httpResult,
)

result2 := c.Send(cloudevents.ContextWithTarget(context.Background(), responseSink), responseEvent)
if cloudevents.IsUndelivered(result2) {
log.Printf("send to response sink returned an error: %v\n", result2)
} else {
log.Printf("Got response from %s\n%s\n", responseSink, result2)
}
}
}

// Wait for next tick
Expand Down
3 changes: 1 addition & 2 deletions test/test_images/transformevents/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/zap"

"knative.dev/eventing/pkg/tracing"
)

Expand Down Expand Up @@ -82,8 +83,6 @@ func main() {
}

c, err := cloudevents.NewClientObserved(t,
cloudevents.WithTimeNow(),
cloudevents.WithUUIDs(),
cloudevents.WithTracePropagation,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion vendor/github.com/cloudevents/sdk-go/v2/client/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions vendor/github.com/cloudevents/sdk-go/v2/test/event_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1
github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1
github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1
github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1
# github.com/cloudevents/sdk-go/v2 v2.1.0
# github.com/cloudevents/sdk-go/v2 v2.0.1-0.20200630063327-b91da81265fe
## explicit
github.com/cloudevents/sdk-go/v2
github.com/cloudevents/sdk-go/v2/binding
Expand Down