Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
knative.dev/hack v0.0.0-20230417170854-f591fea109b3
knative.dev/hack/schema v0.0.0-20230417170854-f591fea109b3
knative.dev/pkg v0.0.0-20230418073056-dfad48eaa5d0
knative.dev/reconciler-test v0.0.0-20230523150310-f5d5aaa8cb3c
knative.dev/reconciler-test v0.0.0-20230808072333-158379f56e0b
sigs.k8s.io/yaml v1.3.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1049,8 +1049,8 @@ knative.dev/hack/schema v0.0.0-20230417170854-f591fea109b3 h1:TUHxKhNDLCX/XaqNaX
knative.dev/hack/schema v0.0.0-20230417170854-f591fea109b3/go.mod h1:GeIb+PLd5mllawcpHEGF5J5fYTQrvgEO5liao8lUKUs=
knative.dev/pkg v0.0.0-20230418073056-dfad48eaa5d0 h1:EFQcoUo8I4bc+U3y6tR1B3ONYZSHWUdAfI7Vh7dae8g=
knative.dev/pkg v0.0.0-20230418073056-dfad48eaa5d0/go.mod h1:2qWPP9Gjh9Q7ETti+WRHnBnGCSCq+6q7m3p/nmUQviE=
knative.dev/reconciler-test v0.0.0-20230523150310-f5d5aaa8cb3c h1:epYgWvrm6MLxHJJ8H7h5qSIja+cxS56u1HQirbm3WE0=
knative.dev/reconciler-test v0.0.0-20230523150310-f5d5aaa8cb3c/go.mod h1:By7fsbkjKWbTmxwAs9lL1itxZI1otbhiEsAZmprEtvI=
knative.dev/reconciler-test v0.0.0-20230808072333-158379f56e0b h1:8cBsA3p9IXv6C+g0KW5efnS9tWWOJl4m0vI4uaa6bhQ=
knative.dev/reconciler-test v0.0.0-20230808072333-158379f56e0b/go.mod h1:By7fsbkjKWbTmxwAs9lL1itxZI1otbhiEsAZmprEtvI=
pgregory.net/rapid v0.3.3 h1:jCjBsY4ln4Atz78QoBWxUEvAHaFyNDQg9+WU62aCn1U=
pgregory.net/rapid v0.3.3/go.mod h1:UYpPVyjFHzYBGHIxLFoupi8vwk6rXNzRY9OMvVxFIOU=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
6 changes: 6 additions & 0 deletions vendor/knative.dev/reconciler-test/cmd/eventshub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"knative.dev/pkg/logging"
"knative.dev/reconciler-test/pkg/eventshub/forwarder"

"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/logger_vent"
Expand All @@ -45,6 +46,11 @@ func main() {
eventshub.SenderEventGenerator: func(ctx context.Context, logs *eventshub.EventLogs) error {
return sender.Start(ctx, logs, eventshub.WithClientTracing)
},
eventshub.ForwarderEventGenerator: func(ctx context.Context, logs *eventshub.EventLogs) error {
return forwarder.NewFromEnv(ctx, logs,
[]eventshub.HandlerFunc{eventshub.WithServerTracing},
[]eventshub.ClientOption{eventshub.WithClientTracing}).Start(ctx)
},
},
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .name }}
name: {{ .serviceName }}
namespace: {{ .namespace }}
spec:
selector:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2023 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: {{ .name }}
namespace: {{ .namespace }}
{{ if .annotations }}
annotations:
{{ range $key, $value := .annotations }}
{{ $key }}: "{{ $value }}"
{{ end }}
{{ end }}
spec:
template:
{{ if .podannotations }}
annotations:
{{ range $key, $value := .podannotations }}
{{ $key }}: "{{ $value }}"
{{ end }}
{{ end }}
spec:
serviceAccountName: "{{ .name }}"
containers:
- name: eventshub-forwarder
image: {{ .image }}
imagePullPolicy: "IfNotPresent"
env:
- name: "NAME"
value: {{ .name }}
- name: "NAMESPACE"
value: {{ .namespace }}
- name: "SINK"
value: {{ .sink }}
{{ range $key, $value := .envs }}
- name: {{printf "%q" $key}}
value: {{printf "%q" $value}}
{{ end }}
5 changes: 3 additions & 2 deletions vendor/knative.dev/reconciler-test/pkg/eventshub/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func (e *EventLogs) Vent(observed EventInfo) error {
}

const (
ReceiverEventGenerator string = "receiver"
SenderEventGenerator string = "sender"
ReceiverEventGenerator string = "receiver"
SenderEventGenerator string = "sender"
ForwarderEventGenerator string = "forwarder"

RecorderEventLog string = "recorder"
LoggerEventLog string = "logger"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func ImageFromContext(ctx context.Context) string {
if e, ok := ctx.Value(eventshubImageKey{}).(string); ok {
return e
}
return "ko://" + cmdPackage()
return "ko://" + eventshubPackage()
}

// WithCustomImage allows you to specify a custom eventshub image to be used when invoking eventshub.Install
Expand All @@ -48,7 +48,7 @@ func registerImage(ctx context.Context) error {
return err
}

func cmdPackage() string {
func eventshubPackage() string {
this := reflect.TypeOf(eventshubImageKey{}).PkgPath()
root := path.Dir(path.Dir(this))
return path.Join(root, "cmd", "eventshub")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/*
Copyright 2023 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package forwarder

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding"
"go.opencensus.io/trace"
"go.uber.org/zap"

"github.com/cloudevents/sdk-go/v2/binding"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/kelseyhightower/envconfig"
"knative.dev/pkg/logging"

"knative.dev/reconciler-test/pkg/eventshub"
)

// Forwarder is the entry point for sinking events into the event log.
type Forwarder struct {
// Name is the name of this Forwarder.
Name string

// The current namespace.
Namespace string `envconfig:"NAMESPACE" required:"true"`

// Sink
Sink string

// EventLogs is the list of EventLogger implementors to vent observed events.
EventLogs *eventshub.EventLogs

ctx context.Context
handlerFuncs []eventshub.HandlerFunc
clientOpts []eventshub.ClientOption
httpClient *http.Client
}

type envConfig struct {
// Name is used to identify this instance of the forwarder.
Name string `envconfig:"NAME" default:"forwarder-default" required:"true"`

// The current namespace.
Namespace string `envconfig:"NAMESPACE" required:"true"`

// Sink url for the message destination
Sink string `envconfig:"SINK" required:"true"`
}

func NewFromEnv(ctx context.Context, eventLogs *eventshub.EventLogs, handlerFuncs []eventshub.HandlerFunc, clientOpts []eventshub.ClientOption) *Forwarder {
var env envConfig
if err := envconfig.Process("", &env); err != nil {
logging.FromContext(ctx).Fatal("Failed to process env var", err)
}

logging.FromContext(ctx).Infof("Forwarder environment configuration: %+v", env)

return &Forwarder{
Name: env.Name,
Namespace: env.Namespace,
Sink: env.Sink,
EventLogs: eventLogs,
ctx: ctx,
handlerFuncs: handlerFuncs,
clientOpts: clientOpts,
httpClient: &http.Client{},
}
}

// Start will create the CloudEvents client and start listening for inbound
// HTTP requests. This is a blocking call.
func (o *Forwarder) Start(ctx context.Context) error {
var handler http.Handler = o

for _, opt := range o.clientOpts {
if err := opt(o.httpClient); err != nil {
return fmt.Errorf("unable to apply client option: %w", err)
}
}

for _, dec := range o.handlerFuncs {
handler = dec(handler)
}

server := &http.Server{Addr: ":8080", Handler: handler}

var err error
go func() {
err = server.ListenAndServe()
}()

<-ctx.Done()

if err != nil {
return fmt.Errorf("error while starting the HTTP server: %w", err)
}

logging.FromContext(ctx).Info("Closing the HTTP server")

return server.Close()
}

func (o *Forwarder) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
requestCtx, span := trace.StartSpan(request.Context(), "eventshub-forwarder")
defer span.End()

m := cloudeventshttp.NewMessageFromHttpRequest(request)
defer m.Finish(nil)

event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m)
headers := make(http.Header)
for k, v := range request.Header {
if !strings.HasPrefix(k, "Ce-") {
headers[k] = v
}
}
// Host header is removed from the request.Header map by net/http
if request.Host != "" {
headers.Set("Host", request.Host)
}

eventErrStr := ""
if eventErr != nil {
eventErrStr = eventErr.Error()
}

eventInfo := eventshub.EventInfo{
Error: eventErrStr,
Event: event,
Observer: o.Name,
HTTPHeaders: headers,
Origin: request.RemoteAddr,
Time: time.Now(),
Kind: eventshub.EventReceived,
}

// Log the event that is being forwarded
if err := o.EventLogs.Vent(eventInfo); err != nil {
logging.FromContext(o.ctx).Fatalw("Error while venting the received event", zap.Error(err))
}

req, err := http.NewRequestWithContext(requestCtx, http.MethodPost, o.Sink, nil)
if err != nil {
logging.FromContext(o.ctx).Error("Cannot create the request: ", err)
}

err = cehttp.WriteRequest(requestCtx, binding.ToMessage(event), req)
if err != nil {
logging.FromContext(o.ctx).Error("Cannot write the event: ", err)
}

eventString := "unknown"
if event != nil {
eventString = event.String()
}
span.AddAttributes(
trace.StringAttribute("namespace", o.Namespace),
trace.StringAttribute("event", eventString),
)

res, err := o.httpClient.Do(req)

// Publish sent event info
if err := o.EventLogs.Vent(o.sentInfo(event, req, err)); err != nil {
logging.FromContext(o.ctx).Error("Cannot log forwarded event: ", err)
}

if err == nil {
// Vent the response info
if err := o.EventLogs.Vent(o.responseInfo(res, event)); err != nil {
logging.FromContext(o.ctx).Error("Cannot log response for forwarded event: ", err)
}
}

writer.WriteHeader(http.StatusAccepted)
}

func (o *Forwarder) sentInfo(event *cloudevents.Event, req *http.Request, err error) eventshub.EventInfo {
var eventId string
if event != nil {
eventId = event.ID()
}

eventInfo := eventshub.EventInfo{
Kind: eventshub.EventSent,
Origin: o.Name,
Observer: o.Name,
Time: time.Now(),
SentId: eventId,
}

sentHeaders := make(http.Header)
for k, v := range req.Header {
sentHeaders[k] = v
}
eventInfo.HTTPHeaders = sentHeaders

if err != nil {
eventInfo.Error = err.Error()
} else {
eventInfo.Event = event
}

return eventInfo
}

func (o *Forwarder) responseInfo(res *http.Response, event *cloudevents.Event) eventshub.EventInfo {
var eventId string
if event != nil {
eventId = event.ID()
}

responseInfo := eventshub.EventInfo{
Kind: eventshub.EventResponse,
HTTPHeaders: res.Header,
Origin: o.Sink,
Observer: o.Name,
Time: time.Now(),
StatusCode: res.StatusCode,
SentId: eventId,
}

responseMessage := cehttp.NewMessageFromHttpResponse(res)

if responseMessage.ReadEncoding() == cloudeventsbindings.EncodingUnknown {
body, err := ioutil.ReadAll(res.Body)

if err != nil {
responseInfo.Error = err.Error()
} else {
responseInfo.Body = body
}
} else {
responseEvent, err := cloudeventsbindings.ToEvent(context.Background(), responseMessage)
if err != nil {
responseInfo.Error = err.Error()
} else {
responseInfo.Event = responseEvent
}
}
return responseInfo
}
Loading