Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 57 additions & 29 deletions pkg/buses/gcppubsub/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,20 @@ import (
"github.com/knative/eventing/pkg/buses"
)

// PubSubBus implements a Bus transport using GCP PubSub Topics and
// Subscriptions to persist events as messages.
type PubSubBus struct {
name string
monitor *buses.Monitor
pubsubClient *pubsub.Client
client *http.Client
forwardHeaders []string
receivers map[string]context.CancelFunc
name string
monitor *buses.Monitor
pubsubClient *pubsub.Client
client *http.Client
forwardHeaders map[string]bool
forwardPrefixes []string
receivers map[string]context.CancelFunc
}

// CreateTopic creates a GCP PubSub Topic based on the channel's name and the
// bus's project id.
func (b *PubSubBus) CreateTopic(channel *channelsv1alpha1.Channel, parameters buses.ResolvedParameters) error {
ctx := context.Background()

Expand All @@ -62,6 +67,7 @@ func (b *PubSubBus) CreateTopic(channel *channelsv1alpha1.Channel, parameters bu
return nil
}

// DeleteTopic deletes the GCP PubSub topic associated with the specified chanel.
func (b *PubSubBus) DeleteTopic(channel *channelsv1alpha1.Channel) error {
ctx := context.Background()

Expand All @@ -79,6 +85,10 @@ func (b *PubSubBus) DeleteTopic(channel *channelsv1alpha1.Channel) error {
return topic.Delete(ctx)
}

// CreateOrUpdateSubscription creates a Subscription in GCP PubSub connected to
// a Topic based on the Subscription's channel and the project stored in the
// Bus. The Subscription is a pull subscription used by this controller to
// deliver events.
func (b *PubSubBus) CreateOrUpdateSubscription(sub *channelsv1alpha1.Subscription, parameters buses.ResolvedParameters) error {
ctx := context.Background()

Expand Down Expand Up @@ -109,6 +119,7 @@ func (b *PubSubBus) CreateOrUpdateSubscription(sub *channelsv1alpha1.Subscriptio
return err
}

// DeleteSubscription deletes a GCP PubSub subscription.
func (b *PubSubBus) DeleteSubscription(sub *channelsv1alpha1.Subscription) error {
ctx := context.Background()

Expand All @@ -126,7 +137,7 @@ func (b *PubSubBus) DeleteSubscription(sub *channelsv1alpha1.Subscription) error
return subscription.Delete(ctx)
}

func (b *PubSubBus) SendEventToTopic(channel *channelsv1alpha1.Channel, data []byte, attributes map[string]string) error {
func (b *PubSubBus) sendEventToTopic(channel *channelsv1alpha1.Channel, data []byte, attributes map[string]string) error {
ctx := context.Background()

topicID := b.topicID(channel)
Expand All @@ -148,6 +159,7 @@ func (b *PubSubBus) SendEventToTopic(channel *channelsv1alpha1.Channel, data []b
return nil
}

// ReceiveEvents extracts a single event from a message on a GCP Subscription.
func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, parameters buses.ResolvedParameters) error {
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)
Expand All @@ -170,7 +182,7 @@ func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, parameters
go func() {
glog.Infof("Start receiving events for subscription %q\n", subscriptionID)
err := subscription.Receive(cctx, func(ctx context.Context, message *pubsub.Message) {
err := b.DispatchHTTPEvent(sub, message.Data, message.Attributes)
err := b.dispatchHTTPEvent(sub, message.Data, message.Attributes)
if err != nil {
glog.Warningf("Unable to dispatch event %q to %q", message.ID, sub.Spec.Subscriber)
message.Nack()
Expand All @@ -189,6 +201,7 @@ func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, parameters
return nil
}

// StopReceiveEvents cancels a running ReceieveEvents polling operation.
func (b *PubSubBus) StopReceiveEvents(subscription *channelsv1alpha1.Subscription) error {
subscriptionID := b.subscriptionID(subscription)
if cancel, ok := b.receivers[subscriptionID]; ok {
Expand All @@ -207,6 +220,8 @@ func (b *PubSubBus) subscriptionID(subscription *channelsv1alpha1.Subscription)
return fmt.Sprintf("subscription-%s-%s-%s", b.name, subscription.Namespace, subscription.Name)
}

// ReceiveHTTPEvent converts an event received on the Channel's HTTP endpoint
// and enqueues it in GCP PubSub.
func (b *PubSubBus) ReceiveHTTPEvent(res http.ResponseWriter, req *http.Request) {
host := req.Host
glog.Infof("Received request for %s\n", host)
Expand All @@ -226,7 +241,7 @@ func (b *PubSubBus) ReceiveHTTPEvent(res http.ResponseWriter, req *http.Request)

attributes := b.headersToAttributes(b.safeHeaders(req.Header))

err = b.SendEventToTopic(channel, data, attributes)
err = b.sendEventToTopic(channel, data, attributes)
if err != nil {
glog.Warningf("Unable to send event to topic %q: %v", channel.Name, err)
res.WriteHeader(http.StatusInternalServerError)
Expand All @@ -236,7 +251,7 @@ func (b *PubSubBus) ReceiveHTTPEvent(res http.ResponseWriter, req *http.Request)
res.WriteHeader(http.StatusAccepted)
}

func (b *PubSubBus) DispatchHTTPEvent(subscription *channelsv1alpha1.Subscription, data []byte, attributes map[string]string) error {
func (b *PubSubBus) dispatchHTTPEvent(subscription *channelsv1alpha1.Subscription, data []byte, attributes map[string]string) error {
subscriber := subscription.Spec.Subscriber
url := url.URL{
Scheme: "http",
Expand Down Expand Up @@ -267,9 +282,16 @@ func (b *PubSubBus) splitChannelName(host string) (string, string) {

func (b *PubSubBus) safeHeaders(raw http.Header) http.Header {
safe := http.Header{}
for _, header := range b.forwardHeaders {
if value := raw.Get(header); value != "" {
safe.Set(header, value)
for h, v := range raw {
if _, ok := b.forwardHeaders[h]; ok {
safe[h] = v
break
}
for _, p := range b.forwardPrefixes {
if strings.HasPrefix(h, p) {
safe[h] = v
break
}
}
}
return safe
Expand All @@ -278,7 +300,7 @@ func (b *PubSubBus) safeHeaders(raw http.Header) http.Header {
func (b *PubSubBus) headersToAttributes(headers http.Header) map[string]string {
attributes := make(map[string]string)
for name, value := range headers {
// TODO hanle compound headers
// TODO: handle compound headers
attributes[name] = value[0]
}
return attributes
Expand All @@ -292,16 +314,21 @@ func (b *PubSubBus) attributesToHeaders(attributes map[string]string) http.Heade
return headers
}

// NewPubSubBus creates a new bus backed by GCP PubSub topics and subscriptions
// in the specified project.
func NewPubSubBus(name string, projectID string, monitor *buses.Monitor) (*PubSubBus, error) {
forwardHeaders := []string{
"content-type",
"x-request-id",
"x-b3-traceid",
"x-b3-spanid",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-flags",
"x-ot-span-context",
forwardHeaders := map[string]bool{
"content-type": true,
"x-request-id": true,
"x-b3-traceid": true,
"x-b3-spanid": true,
"x-b3-parentspanid": true,
"x-b3-sampled": true,
"x-b3-flags": true,
"x-ot-span-context": true,
}
forwardPrefixes := []string{
"ce-",
}

ctx := context.Background()
Expand All @@ -311,12 +338,13 @@ func NewPubSubBus(name string, projectID string, monitor *buses.Monitor) (*PubSu
}

bus := PubSubBus{
name: name,
monitor: monitor,
pubsubClient: pubsubClient,
client: &http.Client{},
forwardHeaders: forwardHeaders,
receivers: map[string]context.CancelFunc{},
name: name,
monitor: monitor,
pubsubClient: pubsubClient,
client: &http.Client{},
forwardHeaders: forwardHeaders,
forwardPrefixes: forwardPrefixes,
receivers: map[string]context.CancelFunc{},
}

return &bus, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/buses/kafka/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func initialOffset(parameters buses.ResolvedParameters) (int64, error) {
func kafka2HttpHeaders(message *sarama.ConsumerMessage) http.Header {
result := make(http.Header)
for _, h := range message.Headers {
result[string(h.Key)] = []string{string(h.Value)}
result.Set(string(h.Key), string(h.Value))
}
return result
}
Expand Down
57 changes: 37 additions & 20 deletions pkg/buses/stub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ var (
kubeconfig string
)

// StubBus is an in-memory forwarding implementation that forwards each
// channel to subscribers in-memory.
type StubBus struct {
name string
monitor *buses.Monitor
client *http.Client
forwardHeaders []string
name string
monitor *buses.Monitor
client *http.Client
forwardHeaders map[string]bool
forwardPrefixes []string
}

func (b *StubBus) handleEvent(res http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -113,31 +116,45 @@ func (b *StubBus) splitChannelName(host string) (string, string) {

func (b *StubBus) safeHeaders(raw http.Header) http.Header {
safe := http.Header{}
for _, header := range b.forwardHeaders {
if value := raw.Get(header); value != "" {
safe.Set(header, value)
for h, v := range raw {
if _, ok := b.forwardHeaders[h]; ok {
safe[h] = v
break
}
for _, p := range b.forwardPrefixes {
if strings.HasPrefix(h, p) {
safe[h] = v
break
}
}
}
return safe
}

// NewStubBus creates a StubBus which forwards requests using a header
// whitelist.
func NewStubBus(name string, monitor *buses.Monitor) *StubBus {
forwardHeaders := []string{
"content-type",
"x-request-id",
"x-b3-traceid",
"x-b3-spanid",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-flags",
"x-ot-span-context",
forwardHeaders := map[string]bool{
"content-type": true,
"x-request-id": true,
"x-b3-traceid": true,
"x-b3-spanid": true,
"x-b3-parentspanid": true,
"x-b3-sampled": true,
"x-b3-flags": true,
"x-ot-span-context": true,
}
// TODO: should we instead blacklist specific headers?
forwardPrefixes := []string{
"ce-",
}

bus := StubBus{
name: name,
monitor: monitor,
client: &http.Client{},
forwardHeaders: forwardHeaders,
name: name,
monitor: monitor,
client: &http.Client{},
forwardHeaders: forwardHeaders,
forwardPrefixes: forwardPrefixes,
}

return &bus
Expand Down