From 7318d06ac4d535cfd8258189ce28b25fcb8a8928 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Tue, 12 Jun 2018 11:49:17 -0400 Subject: [PATCH 1/2] GCP Pub/Sub Knative Bus A Knative Bus implementation backed by GCP Cloud Pub/Sub. Deployment steps: 1. Setup Knative Eventing 2. [Create a service account](https://console.cloud.google.com/iam-admin/serviceaccounts/project) with the 'Pub/Sub Editor' role, and download a new JSON private key. 3. Create a secret for the downloaded key `kubectl create secret generic gcppubsub-bus-key --from-file=key.json=PATH-TO-KEY-FILE.json` 4. Replace `$PROJECT_ID` in `config/buses/gcppubsub.yaml` with your GCP Project ID 5. Apply the 'gcppubsub' Bus `ko apply -f config/buses/gcppubsub.yaml` 6. Create Channels that reference the 'gcppubsub' Bus The bus has an independent provisioner and dispatcher. The provisioner will create GCP Pub/Sub Topics and Subscriptions for each Knative Channel and Subscription (respectively) targeting the Bus. Clients should avoid interacting with topics and subscriptions provisioned by the Bus. The dispatcher can receive events via the Channel Service from inside the cluster. Events on the Pub/Sub topic for an active subscription are forwarded via HTTP to the subscriber. HTTP responses with a 2xx status code are acknowledge while all other status code will have delivery reattempted. Note: Cloud Pub/Sub does not guarantee exactly once delivery, subscribers must guard against multiple deliveries of the same event. To view logs: - for the dispatcher `kail -d gcppubsub-bus -c dispatcher` - for the provisioner `kail -d gcppubsub-bus-provisioner -c provisioner` --- config/buses/gcppubsub.yaml | 62 +++++ pkg/buses/gcppubsub/bus.go | 323 ++++++++++++++++++++++++ pkg/buses/gcppubsub/dispatcher/main.go | 86 +++++++ pkg/buses/gcppubsub/provisioner/main.go | 85 +++++++ 4 files changed, 556 insertions(+) create mode 100644 config/buses/gcppubsub.yaml create mode 100644 pkg/buses/gcppubsub/bus.go create mode 100644 pkg/buses/gcppubsub/dispatcher/main.go create mode 100644 pkg/buses/gcppubsub/provisioner/main.go diff --git a/config/buses/gcppubsub.yaml b/config/buses/gcppubsub.yaml new file mode 100644 index 00000000000..57e7dc3ace0 --- /dev/null +++ b/config/buses/gcppubsub.yaml @@ -0,0 +1,62 @@ +# Copyright 2018 Google, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: channels.knative.dev/v1alpha1 +kind: Bus +metadata: + name: gcppubsub +spec: + provisioner: + name: provisioner + image: github.com/knative/eventing/pkg/buses/gcppubsub/provisioner + args: [ + "-logtostderr", + "-stderrthreshold", "INFO", + ] + env: &env + - name: GOOGLE_CLOUD_PROJECT + value: $PROJECT_ID + - name: GOOGLE_APPLICATION_CREDENTIALS + value: /var/secrets/google/key.json + volumeMounts: + - name: google-cloud-key + mountPath: /var/secrets/google + dispatcher: + name: dispatcher + image: github.com/knative/eventing/pkg/buses/gcppubsub/dispatcher + args: [ + "-logtostderr", + "-stderrthreshold", "INFO", + ] + env: *env + volumeMounts: + - name: google-cloud-key + mountPath: /var/secrets/google + volumes: + - name: google-cloud-key + secret: + secretName: gcppubsub-bus-key +--- +apiVersion: networking.istio.io/v1alpha3 +kind: ServiceEntry +metadata: + name: gcppubsub-bus-ext +spec: + hosts: + - "*.googleapis.com" + - "accounts.google.com" + ports: + - number: 443 + name: https + protocol: HTTPS + location: MESH_EXTERNAL diff --git a/pkg/buses/gcppubsub/bus.go b/pkg/buses/gcppubsub/bus.go new file mode 100644 index 00000000000..1013f324779 --- /dev/null +++ b/pkg/buses/gcppubsub/bus.go @@ -0,0 +1,323 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gcppubsub + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + + "cloud.google.com/go/pubsub" + "github.com/golang/glog" + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + "github.com/knative/eventing/pkg/buses" +) + +type PubSubBus struct { + name string + monitor *buses.Monitor + pubsubClient *pubsub.Client + client *http.Client + forwardHeaders []string + receivers map[string]context.CancelFunc +} + +func (b *PubSubBus) CreateTopic(channel *channelsv1alpha1.Channel, attributes buses.Attributes) error { + ctx := context.Background() + + topicID := b.topicID(channel) + topic := b.pubsubClient.Topic(topicID) + + // check if topic exists before creating + if exists, err := topic.Exists(ctx); err != nil { + return err + } else if exists { + return nil + } + + glog.Infof("Create topic %q\n", topicID) + topic, err := b.pubsubClient.CreateTopic(ctx, topicID) + if err != nil { + return err + } + + return nil +} + +func (b *PubSubBus) DeleteTopic(channel *channelsv1alpha1.Channel) error { + ctx := context.Background() + + topicID := b.topicID(channel) + topic := b.pubsubClient.Topic(topicID) + + // check if topic exists before deleting + if exists, err := topic.Exists(ctx); err != nil { + return err + } else if !exists { + return nil + } + + glog.Infof("Delete topic %q\n", topicID) + return topic.Delete(ctx) +} + +func (b *PubSubBus) CreateOrUpdateSubscription(sub *channelsv1alpha1.Subscription, attributes buses.Attributes) error { + ctx := context.Background() + + subscriptionID := b.subscriptionID(sub) + subscription := b.pubsubClient.Subscription(subscriptionID) + + // check if subscription exists before creating + if exists, err := subscription.Exists(ctx); err != nil { + return err + } else if exists { + // TODO update subscription configuration + // _, err := subscription.Update(b.ctx, pubsub.SubscriptionConfigToUpdate{}) + // return err + return nil + } + + // create subscription + channel := b.monitor.Channel(sub.Spec.Channel, sub.Namespace) + if channel == nil { + return fmt.Errorf("Cannot create a Subscription for unknown Channel %q", sub.Spec.Channel) + } + topicID := b.topicID(channel) + topic := b.pubsubClient.Topic(topicID) + glog.Infof("Create subscription %q for topic %q\n", subscriptionID, topicID) + subscription, err := b.pubsubClient.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{ + Topic: topic, + }) + return err +} + +func (b *PubSubBus) DeleteSubscription(sub *channelsv1alpha1.Subscription) error { + ctx := context.Background() + + subscriptionID := b.subscriptionID(sub) + subscription := b.pubsubClient.Subscription(subscriptionID) + + // check if subscription exists before deleting + if exists, err := subscription.Exists(ctx); err != nil { + return err + } else if !exists { + return nil + } + + glog.Infof("Deleting subscription %q\n", subscriptionID) + return subscription.Delete(ctx) +} + +func (b *PubSubBus) SendEventToTopic(channel *channelsv1alpha1.Channel, data []byte, attributes map[string]string) error { + ctx := context.Background() + + topicID := b.topicID(channel) + topic := b.pubsubClient.Topic(topicID) + + result := topic.Publish(ctx, &pubsub.Message{ + Data: data, + Attributes: attributes, + }) + id, err := result.Get(ctx) + if err != nil { + return err + } + + // TODO allow topics to be reused between publish events, call .Stop after an idle period + topic.Stop() + + glog.Infof("Published a message to %s; msg ID: %v\n", topicID, id) + return nil +} + +func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, attributes buses.Attributes) error { + ctx := context.Background() + cctx, cancel := context.WithCancel(ctx) + + // cancel current subscription receiver, if any + b.StopReceiveEvents(sub) + + subscriptionID := b.subscriptionID(sub) + subscription := b.pubsubClient.Subscription(subscriptionID) + b.receivers[subscriptionID] = cancel + + // check if subscription exists before receiving + if exists, err := subscription.Exists(ctx); err != nil { + return err + } else if !exists { + return fmt.Errorf("cannot receive message for a non-existent subscription %s", subscriptionID) + } + + // subscription.Receive blocks, so run it in a goroutine + go func() { + glog.Infof("Start receiving events for subscription %q\n", subscriptionID) + err := subscription.Receive(cctx, func(ctx context.Context, message *pubsub.Message) { + err := b.DispatchHTTPEvent(sub, message.Data, message.Attributes) + if err != nil { + glog.Warningf("Unable to dispatch event %q to %q", message.ID, sub.Spec.Subscriber) + message.Nack() + } else { + glog.Infof("Dispatched event %q to %q", message.ID, sub.Spec.Subscriber) + message.Ack() + } + }) + if err != nil { + glog.Errorf("Error receiving messesages for %q: %v\n", subscriptionID, err) + } + delete(b.receivers, subscriptionID) + b.monitor.RequeueSubscription(sub) + }() + + return nil +} + +func (b *PubSubBus) StopReceiveEvents(subscription *channelsv1alpha1.Subscription) error { + subscriptionID := b.subscriptionID(subscription) + if cancel, ok := b.receivers[subscriptionID]; ok { + glog.Infof("Stop receiving events for subscription %q\n", subscriptionID) + cancel() + delete(b.receivers, subscriptionID) + } + return nil +} + +func (b *PubSubBus) topicID(channel *channelsv1alpha1.Channel) string { + return fmt.Sprintf("channel-%s-%s-%s", b.name, channel.Namespace, channel.Name) +} + +func (b *PubSubBus) subscriptionID(subscription *channelsv1alpha1.Subscription) string { + return fmt.Sprintf("subscription-%s-%s-%s", b.name, subscription.Namespace, subscription.Name) +} + +func (b *PubSubBus) ReceiveHTTPEvent(res http.ResponseWriter, req *http.Request) { + host := req.Host + glog.Infof("Received request for %s\n", host) + + name, namespace := b.splitChannelName(host) + channel := b.monitor.Channel(name, namespace) + if channel == nil { + res.WriteHeader(http.StatusNotFound) + return + } + + data, err := ioutil.ReadAll(req.Body) + if err != nil { + res.WriteHeader(http.StatusInternalServerError) + return + } + + attributes := b.headersToAttributes(b.safeHeaders(req.Header)) + + err = b.SendEventToTopic(channel, data, attributes) + if err != nil { + glog.Warningf("Unable to send event to topic %q: %v", channel.Name, err) + res.WriteHeader(http.StatusInternalServerError) + return + } + + res.WriteHeader(http.StatusAccepted) +} + +func (b *PubSubBus) DispatchHTTPEvent(subscription *channelsv1alpha1.Subscription, data []byte, attributes map[string]string) error { + subscriber := subscription.Spec.Subscriber + url := url.URL{ + Scheme: "http", + Host: subscription.Spec.Subscriber, + Path: "/", + } + req, err := http.NewRequest(http.MethodPost, url.String(), bytes.NewReader(data)) + if err != nil { + return err + } + req.Header = b.safeHeaders(b.attributesToHeaders(attributes)) + res, err := b.client.Do(req) + if err != nil { + return err + } + if res.StatusCode < 200 || res.StatusCode >= 300 { + return fmt.Errorf("Subscribing service %q did not accept event: got HTTP %d", subscriber, res.StatusCode) + } + return nil +} + +func (b *PubSubBus) splitChannelName(host string) (string, string) { + chunks := strings.Split(host, ".") + channel := chunks[0] + namespace := chunks[1] + return channel, namespace +} + +func (b *PubSubBus) safeHeaders(raw http.Header) http.Header { + safe := http.Header{} + for _, header := range b.forwardHeaders { + if value := raw.Get(header); value != "" { + safe.Set(header, value) + } + } + return safe +} + +func (b *PubSubBus) headersToAttributes(headers http.Header) map[string]string { + attributes := make(map[string]string) + for name, value := range headers { + // TODO hanle compound headers + attributes[name] = value[0] + } + return attributes +} + +func (b *PubSubBus) attributesToHeaders(attributes map[string]string) http.Header { + headers := http.Header{} + for name, value := range attributes { + headers.Set(name, value) + } + return headers +} + +func NewPubSubBus(name string, projectID string, monitor *buses.Monitor) (*PubSubBus, error) { + forwardHeaders := []string{ + "content-type", + "x-request-id", + "x-b3-traceid", + "x-b3-spanid", + "x-b3-parentspanid", + "x-b3-sampled", + "x-b3-flags", + "x-ot-span-context", + } + + ctx := context.Background() + pubsubClient, err := pubsub.NewClient(ctx, projectID) + if err != nil { + return nil, err + } + + bus := PubSubBus{ + name: name, + monitor: monitor, + pubsubClient: pubsubClient, + client: &http.Client{}, + forwardHeaders: forwardHeaders, + receivers: map[string]context.CancelFunc{}, + } + + return &bus, nil +} diff --git a/pkg/buses/gcppubsub/dispatcher/main.go b/pkg/buses/gcppubsub/dispatcher/main.go new file mode 100644 index 00000000000..dd3f8e05a4d --- /dev/null +++ b/pkg/buses/gcppubsub/dispatcher/main.go @@ -0,0 +1,86 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + + "github.com/golang/glog" + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + "github.com/knative/eventing/pkg/buses" + "github.com/knative/eventing/pkg/buses/gcppubsub" + "github.com/knative/eventing/pkg/signals" +) + +const ( + threadsPerMonitor = 1 +) + +var ( + masterURL string + kubeconfig string +) + +func main() { + defer glog.Flush() + + flag.Parse() + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + namespace := os.Getenv("BUS_NAMESPACE") + name := os.Getenv("BUS_NAME") + projectID := os.Getenv("GOOGLE_CLOUD_PROJECT") + if projectID == "" { + glog.Fatalf("GOOGLE_CLOUD_PROJECT environment variable must be set.\n") + } + + component := fmt.Sprintf("%s-%s", name, buses.Dispatcher) + + var bus *gcppubsub.PubSubBus + monitor := buses.NewMonitor(component, masterURL, kubeconfig, buses.MonitorEventHandlerFuncs{ + SubscribeFunc: func(subscription *channelsv1alpha1.Subscription, attributes buses.Attributes) error { + return bus.ReceiveEvents(subscription, attributes) + }, + UnsubscribeFunc: func(subscription *channelsv1alpha1.Subscription) error { + return bus.StopReceiveEvents(subscription) + }, + }) + bus, err := gcppubsub.NewPubSubBus(name, projectID, monitor) + if err != nil { + glog.Fatalf("Failed to create pubsub bus: %v", err) + } + + go func() { + if err := monitor.Run(namespace, name, threadsPerMonitor, stopCh); err != nil { + glog.Fatalf("Error running monitor: %s", err.Error()) + } + }() + + glog.Infoln("Starting web server") + http.HandleFunc("/", bus.ReceiveHTTPEvent) + glog.Fatal(http.ListenAndServe(":8080", nil)) +} + +func init() { + flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") +} diff --git a/pkg/buses/gcppubsub/provisioner/main.go b/pkg/buses/gcppubsub/provisioner/main.go new file mode 100644 index 00000000000..a71e4b2f7e1 --- /dev/null +++ b/pkg/buses/gcppubsub/provisioner/main.go @@ -0,0 +1,85 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/golang/glog" + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + "github.com/knative/eventing/pkg/buses" + "github.com/knative/eventing/pkg/buses/gcppubsub" + "github.com/knative/eventing/pkg/signals" +) + +const ( + threadsPerMonitor = 1 +) + +var ( + masterURL string + kubeconfig string +) + +func main() { + defer glog.Flush() + + flag.Parse() + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + namespace := os.Getenv("BUS_NAMESPACE") + name := os.Getenv("BUS_NAME") + projectID := os.Getenv("GOOGLE_CLOUD_PROJECT") + if projectID == "" { + glog.Fatalf("GOOGLE_CLOUD_PROJECT environment variable must be set.\n") + } + + component := fmt.Sprintf("%s-%s", name, buses.Provisioner) + + var bus *gcppubsub.PubSubBus + monitor := buses.NewMonitor(component, masterURL, kubeconfig, buses.MonitorEventHandlerFuncs{ + ProvisionFunc: func(channel *channelsv1alpha1.Channel, attributes buses.Attributes) error { + return bus.CreateTopic(channel, attributes) + }, + UnprovisionFunc: func(channel *channelsv1alpha1.Channel) error { + return bus.DeleteTopic(channel) + }, + SubscribeFunc: func(subscription *channelsv1alpha1.Subscription, attributes buses.Attributes) error { + return bus.CreateOrUpdateSubscription(subscription, attributes) + }, + UnsubscribeFunc: func(subscription *channelsv1alpha1.Subscription) error { + return bus.DeleteSubscription(subscription) + }, + }) + bus, err := gcppubsub.NewPubSubBus(name, projectID, monitor) + if err != nil { + glog.Fatalf("Failed to create pubsub bus: %v", err) + } + + if err := monitor.Run(namespace, name, threadsPerMonitor, stopCh); err != nil { + glog.Fatalf("Error running monitor: %s", err.Error()) + } +} + +func init() { + flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") +} From e5530cc32b64e0179553fb163eac230e2625a8b8 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Tue, 26 Jun 2018 08:27:51 -0400 Subject: [PATCH 2/2] update copyright headers --- config/buses/gcppubsub.yaml | 2 +- pkg/buses/gcppubsub/bus.go | 2 +- pkg/buses/gcppubsub/dispatcher/main.go | 2 +- pkg/buses/gcppubsub/provisioner/main.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config/buses/gcppubsub.yaml b/config/buses/gcppubsub.yaml index 57e7dc3ace0..f48157f3c21 100644 --- a/config/buses/gcppubsub.yaml +++ b/config/buses/gcppubsub.yaml @@ -1,4 +1,4 @@ -# Copyright 2018 Google, Inc. All rights reserved. +# Copyright 2018 The Knative Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/pkg/buses/gcppubsub/bus.go b/pkg/buses/gcppubsub/bus.go index 1013f324779..59520c9ac42 100644 --- a/pkg/buses/gcppubsub/bus.go +++ b/pkg/buses/gcppubsub/bus.go @@ -1,5 +1,5 @@ /* - * Copyright 2018 the original author or authors. + * Copyright 2018 The Knative Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/pkg/buses/gcppubsub/dispatcher/main.go b/pkg/buses/gcppubsub/dispatcher/main.go index dd3f8e05a4d..4f99eb8ecc4 100644 --- a/pkg/buses/gcppubsub/dispatcher/main.go +++ b/pkg/buses/gcppubsub/dispatcher/main.go @@ -1,5 +1,5 @@ /* - * Copyright 2018 the original author or authors. + * Copyright 2018 The Knative Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/pkg/buses/gcppubsub/provisioner/main.go b/pkg/buses/gcppubsub/provisioner/main.go index a71e4b2f7e1..5bf6f8a067c 100644 --- a/pkg/buses/gcppubsub/provisioner/main.go +++ b/pkg/buses/gcppubsub/provisioner/main.go @@ -1,5 +1,5 @@ /* - * Copyright 2018 the original author or authors. + * Copyright 2018 The Knative Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.