Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions config/buses/gcppubsub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: channels.knative.dev/v1alpha1
kind: Bus
metadata:
name: gcppubsub
spec:
provisioner:
name: provisioner
image: github.com/knative/eventing/pkg/buses/gcppubsub/provisioner
args: [
"-logtostderr",
"-stderrthreshold", "INFO",
]
env: &env
- name: GOOGLE_CLOUD_PROJECT
value: $PROJECT_ID
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /var/secrets/google/key.json
volumeMounts:
- name: google-cloud-key
mountPath: /var/secrets/google
dispatcher:
name: dispatcher
image: github.com/knative/eventing/pkg/buses/gcppubsub/dispatcher
args: [
"-logtostderr",
"-stderrthreshold", "INFO",
]
env: *env
volumeMounts:
- name: google-cloud-key
mountPath: /var/secrets/google
volumes:
- name: google-cloud-key
secret:
secretName: gcppubsub-bus-key
---
apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: gcppubsub-bus-ext
spec:
hosts:
- "*.googleapis.com"
- "accounts.google.com"
ports:
- number: 443
name: https
protocol: HTTPS
location: MESH_EXTERNAL
323 changes: 323 additions & 0 deletions pkg/buses/gcppubsub/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
/*
* Copyright 2018 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package gcppubsub

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"

"cloud.google.com/go/pubsub"
"github.com/golang/glog"
channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1"
"github.com/knative/eventing/pkg/buses"
)

type PubSubBus struct {
name string
monitor *buses.Monitor
pubsubClient *pubsub.Client
client *http.Client
forwardHeaders []string
receivers map[string]context.CancelFunc
}

func (b *PubSubBus) CreateTopic(channel *channelsv1alpha1.Channel, attributes buses.Attributes) error {
ctx := context.Background()

topicID := b.topicID(channel)
topic := b.pubsubClient.Topic(topicID)

// check if topic exists before creating
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just curious about the pattern of checking for existence before trying these operations and why it's being done in every case. With this check then do something else, there's a race condition anyways, so like I said, just curious about this pattern.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to create a topic will return an error if the topic already exists. Rather than try to inspect the error I found it easier to check if the topic exists before creating.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant the pattern in each of these, like delete for example and so forth. Same deal there?

Copy link
Copy Markdown
Contributor Author

@scothis scothis Jun 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each operation will return an error unless the client was directly responsible for achieving the desired effect. So deleting a topic that doesn't exist will return an error. The published doc doesn't distinguish between different types of errors. There are a few hint in the codebase that NOT_FOUND is distinguished from ALREADY_EXISTS and other errors, but it's not part of the public interface from what I can see.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To your comment about a race condition, the provisioner is a singleton, and the workqueue guarantees that any given resource will never be processed more than once concurrently.

if exists, err := topic.Exists(ctx); err != nil {
return err
} else if exists {
return nil
}

glog.Infof("Create topic %q\n", topicID)
topic, err := b.pubsubClient.CreateTopic(ctx, topicID)
if err != nil {
return err
}

return nil
}

func (b *PubSubBus) DeleteTopic(channel *channelsv1alpha1.Channel) error {
ctx := context.Background()

topicID := b.topicID(channel)
topic := b.pubsubClient.Topic(topicID)

// check if topic exists before deleting
if exists, err := topic.Exists(ctx); err != nil {
return err
} else if !exists {
return nil
}

glog.Infof("Delete topic %q\n", topicID)
return topic.Delete(ctx)
}

func (b *PubSubBus) CreateOrUpdateSubscription(sub *channelsv1alpha1.Subscription, attributes buses.Attributes) error {
ctx := context.Background()

subscriptionID := b.subscriptionID(sub)
subscription := b.pubsubClient.Subscription(subscriptionID)

// check if subscription exists before creating
if exists, err := subscription.Exists(ctx); err != nil {
return err
} else if exists {
// TODO update subscription configuration
// _, err := subscription.Update(b.ctx, pubsub.SubscriptionConfigToUpdate{})
// return err
return nil
}

// create subscription
channel := b.monitor.Channel(sub.Spec.Channel, sub.Namespace)
if channel == nil {
return fmt.Errorf("Cannot create a Subscription for unknown Channel %q", sub.Spec.Channel)
}
topicID := b.topicID(channel)
topic := b.pubsubClient.Topic(topicID)
glog.Infof("Create subscription %q for topic %q\n", subscriptionID, topicID)
subscription, err := b.pubsubClient.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{
Topic: topic,
})
return err
}

func (b *PubSubBus) DeleteSubscription(sub *channelsv1alpha1.Subscription) error {
ctx := context.Background()

subscriptionID := b.subscriptionID(sub)
subscription := b.pubsubClient.Subscription(subscriptionID)

// check if subscription exists before deleting
if exists, err := subscription.Exists(ctx); err != nil {
return err
} else if !exists {
return nil
}

glog.Infof("Deleting subscription %q\n", subscriptionID)
return subscription.Delete(ctx)
}

func (b *PubSubBus) SendEventToTopic(channel *channelsv1alpha1.Channel, data []byte, attributes map[string]string) error {
ctx := context.Background()

topicID := b.topicID(channel)
topic := b.pubsubClient.Topic(topicID)

result := topic.Publish(ctx, &pubsub.Message{
Data: data,
Attributes: attributes,
})
id, err := result.Get(ctx)
if err != nil {
return err
}

// TODO allow topics to be reused between publish events, call .Stop after an idle period
topic.Stop()

glog.Infof("Published a message to %s; msg ID: %v\n", topicID, id)
return nil
}

func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, attributes buses.Attributes) error {
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)

// cancel current subscription receiver, if any
b.StopReceiveEvents(sub)

subscriptionID := b.subscriptionID(sub)
subscription := b.pubsubClient.Subscription(subscriptionID)
b.receivers[subscriptionID] = cancel

// check if subscription exists before receiving
if exists, err := subscription.Exists(ctx); err != nil {
return err
} else if !exists {
return fmt.Errorf("cannot receive message for a non-existent subscription %s", subscriptionID)
}

// subscription.Receive blocks, so run it in a goroutine
go func() {
glog.Infof("Start receiving events for subscription %q\n", subscriptionID)
err := subscription.Receive(cctx, func(ctx context.Context, message *pubsub.Message) {
err := b.DispatchHTTPEvent(sub, message.Data, message.Attributes)
if err != nil {
glog.Warningf("Unable to dispatch event %q to %q", message.ID, sub.Spec.Subscriber)
message.Nack()
} else {
glog.Infof("Dispatched event %q to %q", message.ID, sub.Spec.Subscriber)
message.Ack()
}
})
if err != nil {
glog.Errorf("Error receiving messesages for %q: %v\n", subscriptionID, err)
}
delete(b.receivers, subscriptionID)
b.monitor.RequeueSubscription(sub)
}()

return nil
}

func (b *PubSubBus) StopReceiveEvents(subscription *channelsv1alpha1.Subscription) error {
subscriptionID := b.subscriptionID(subscription)
if cancel, ok := b.receivers[subscriptionID]; ok {
glog.Infof("Stop receiving events for subscription %q\n", subscriptionID)
cancel()
delete(b.receivers, subscriptionID)
}
return nil
}

func (b *PubSubBus) topicID(channel *channelsv1alpha1.Channel) string {
return fmt.Sprintf("channel-%s-%s-%s", b.name, channel.Namespace, channel.Name)
}

func (b *PubSubBus) subscriptionID(subscription *channelsv1alpha1.Subscription) string {
return fmt.Sprintf("subscription-%s-%s-%s", b.name, subscription.Namespace, subscription.Name)
}

func (b *PubSubBus) ReceiveHTTPEvent(res http.ResponseWriter, req *http.Request) {
host := req.Host
glog.Infof("Received request for %s\n", host)

name, namespace := b.splitChannelName(host)
channel := b.monitor.Channel(name, namespace)
if channel == nil {
res.WriteHeader(http.StatusNotFound)
return
}

data, err := ioutil.ReadAll(req.Body)
if err != nil {
res.WriteHeader(http.StatusInternalServerError)
return
}

attributes := b.headersToAttributes(b.safeHeaders(req.Header))

err = b.SendEventToTopic(channel, data, attributes)
if err != nil {
glog.Warningf("Unable to send event to topic %q: %v", channel.Name, err)
res.WriteHeader(http.StatusInternalServerError)
return
}

res.WriteHeader(http.StatusAccepted)
}

func (b *PubSubBus) DispatchHTTPEvent(subscription *channelsv1alpha1.Subscription, data []byte, attributes map[string]string) error {
subscriber := subscription.Spec.Subscriber
url := url.URL{
Scheme: "http",
Host: subscription.Spec.Subscriber,
Path: "/",
}
req, err := http.NewRequest(http.MethodPost, url.String(), bytes.NewReader(data))
if err != nil {
return err
}
req.Header = b.safeHeaders(b.attributesToHeaders(attributes))
res, err := b.client.Do(req)
if err != nil {
return err
}
if res.StatusCode < 200 || res.StatusCode >= 300 {
return fmt.Errorf("Subscribing service %q did not accept event: got HTTP %d", subscriber, res.StatusCode)
}
return nil
}

func (b *PubSubBus) splitChannelName(host string) (string, string) {
chunks := strings.Split(host, ".")
channel := chunks[0]
namespace := chunks[1]
return channel, namespace
}

func (b *PubSubBus) safeHeaders(raw http.Header) http.Header {
safe := http.Header{}
for _, header := range b.forwardHeaders {
if value := raw.Get(header); value != "" {
safe.Set(header, value)
}
}
return safe
}

func (b *PubSubBus) headersToAttributes(headers http.Header) map[string]string {
attributes := make(map[string]string)
for name, value := range headers {
// TODO hanle compound headers
attributes[name] = value[0]
}
return attributes
}

func (b *PubSubBus) attributesToHeaders(attributes map[string]string) http.Header {
headers := http.Header{}
for name, value := range attributes {
headers.Set(name, value)
}
return headers
}

func NewPubSubBus(name string, projectID string, monitor *buses.Monitor) (*PubSubBus, error) {
forwardHeaders := []string{
"content-type",
"x-request-id",
"x-b3-traceid",
"x-b3-spanid",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-flags",
"x-ot-span-context",
}

ctx := context.Background()
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, err
}

bus := PubSubBus{
name: name,
monitor: monitor,
pubsubClient: pubsubClient,
client: &http.Client{},
forwardHeaders: forwardHeaders,
receivers: map[string]context.CancelFunc{},
}

return &bus, nil
}
Loading