Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions test/performance/infra/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"

"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/test/performance/infra/common"
pb "knative.dev/eventing/test/performance/infra/event_state"
)
Expand Down Expand Up @@ -148,7 +147,7 @@ func (r *Receiver) processEvents() {
}

func (r *Receiver) startCloudEventsReceiver(ctx context.Context) error {
cli, err := kncloudevents.NewDefaultClient()
cli, err := cloudevents.NewDefaultClient()
if err != nil {
return fmt.Errorf("failed to create CloudEvents client: %v", err)
}
Expand Down
22 changes: 8 additions & 14 deletions test/test_images/filterevents/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"flag"
"log"

cloudevents "github.com/cloudevents/sdk-go"
cloudevents "github.com/cloudevents/sdk-go/v2"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)

var (
Expand All @@ -32,26 +33,19 @@ func init() {
flag.BoolVar(&filter, "filter", false, "Whether to filter the event")
}

func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error {
func gotEvent(event cloudevents.Event) (*cloudevents.Event, cloudevents.Result) {
ctx := event.Context.AsV1()

dataBytes, err := event.DataBytes()
if err != nil {
log.Printf("Got Data Error: %s\n", err.Error())
return err
}
log.Println("Received a new event: ")
log.Printf("[%s] %s %s: %s", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), dataBytes)
log.Printf("[%s] %s %s: %s", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), string(event.Data()))

if filter {
log.Println("Filter event")
resp.Status = 200
} else {
event.SetDataContentType(cloudevents.ApplicationJSON)
log.Println("Reply with event")
resp.RespondWith(200, &event)
return nil, cehttp.NewResult(200, "OK")
}
return nil
event.SetDataContentType(cloudevents.ApplicationJSON)
log.Println("Reply with event")
return &event, cehttp.NewResult(200, "OK")
}

func main() {
Expand Down
22 changes: 12 additions & 10 deletions test/test_images/heartbeats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import (

duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing/pkg/kncloudevents"

cloudevents "github.com/cloudevents/sdk-go"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/kelseyhightower/envconfig"
)

Expand Down Expand Up @@ -91,7 +89,12 @@ func main() {
ceOverrides = &overrides
}

c, err := kncloudevents.NewDefaultClient(sink)
p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink))
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}

c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow())
if err != nil {
log.Fatalf("failed to create client: %s", err.Error())
}
Expand All @@ -114,10 +117,9 @@ func main() {
for {
hb.Sequence++

event := cloudevents.NewEvent("1.0")
event := cloudevents.NewEvent()
event.SetType("dev.knative.eventing.samples.heartbeat")
event.SetSource(source)
event.SetDataContentType(cloudevents.ApplicationJSON)
event.SetExtension("the", 42)
event.SetExtension("heart", "yes")
event.SetExtension("beats", true)
Expand All @@ -128,13 +130,13 @@ func main() {
}
}

if err := event.SetData(hb); err != nil {
log.Printf("failed to set cloudevents data: %s", err.Error())
if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil {
log.Fatalf("failed to set cloudevents data: %s", err.Error())
}

log.Printf("sending cloudevent to %s", sink)
if _, _, err := c.Send(context.Background(), event); err != nil {
log.Printf("failed to send cloudevent: %s", err.Error())
if result := c.Send(context.Background(), event); !cloudevents.IsACK(result) {
log.Printf("failed to send cloudevent: %s", result.Error())
}

if env.OneShot {
Expand Down