From ee0e0dd740421524ed485c6be5ac7da09c708ef1 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Fri, 21 Sep 2018 19:06:12 -0400 Subject: [PATCH] Generate unique, stable pubsub topic and subscription IDs With the gcppubsub bus it was possible to create conflicting topics and subscription if two (or more) clusters used the same Bus, Channel and Subscription names. This would result in the misdelivery of messages between the clusters. Now the UID of the Bus/ClusterBus that owns the topic/subscription is a factor in the identifier. The Bus UID will be stable durring The lifetime of the Bus, but unique between individual resources. Fixes #253 --- pkg/buses/gcppubsub/bus.go | 24 +++++++++++++++++------- pkg/buses/gcppubsub/dispatcher/main.go | 3 ++- pkg/buses/gcppubsub/provisioner/main.go | 6 ++++-- pkg/controller/bus/controller.go | 8 ++++++++ pkg/controller/clusterbus/controller.go | 8 ++++++++ 5 files changed, 39 insertions(+), 10 deletions(-) diff --git a/pkg/buses/gcppubsub/bus.go b/pkg/buses/gcppubsub/bus.go index d8385b0e19d..d42be717163 100644 --- a/pkg/buses/gcppubsub/bus.go +++ b/pkg/buses/gcppubsub/bus.go @@ -18,6 +18,7 @@ package gcppubsub import ( "context" + "crypto/sha1" "fmt" "cloud.google.com/go/pubsub" @@ -29,7 +30,7 @@ import ( const BusType = "gcppubsub" type CloudPubSubBus struct { - ref buses.BusReference + id string reconciler *buses.Reconciler dispatcher buses.BusDispatcher provisioner buses.BusProvisioner @@ -40,7 +41,7 @@ type CloudPubSubBus struct { logger *zap.SugaredLogger } -func NewCloudPubSubBusDispatcher(ref buses.BusReference, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { +func NewCloudPubSubBusDispatcher(ref buses.BusReference, busUID string, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { ctx := context.Background() pubsubClient, err := pubsub.NewClient(ctx, projectID) if err != nil { @@ -48,7 +49,7 @@ func NewCloudPubSubBusDispatcher(ref buses.BusReference, projectID string, opts } bus := &CloudPubSubBus{ - ref: ref, + id: busId(busUID), pubsubClient: pubsubClient, } eventHandlers := buses.EventHandlerFuncs{ @@ -70,7 +71,7 @@ func NewCloudPubSubBusDispatcher(ref buses.BusReference, projectID string, opts return bus, nil } -func NewCloudPubSubBusProvisioner(ref buses.BusReference, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { +func NewCloudPubSubBusProvisioner(ref buses.BusReference, busUID string, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { ctx := context.Background() pubsubClient, err := pubsub.NewClient(ctx, projectID) if err != nil { @@ -78,7 +79,7 @@ func NewCloudPubSubBusProvisioner(ref buses.BusReference, projectID string, opts } bus := &CloudPubSubBus{ - ref: ref, + id: busId(busUID), pubsubClient: pubsubClient, } eventHandlers := buses.EventHandlerFuncs{ @@ -284,9 +285,18 @@ func (b *CloudPubSubBus) deleteSubscription(ref buses.SubscriptionReference) err } func (b *CloudPubSubBus) topicID(channel buses.ChannelReference) string { - return fmt.Sprintf("channel-%s-%s-%s", b.ref.Name, channel.Namespace, channel.Name) + return fmt.Sprintf("channel-%s-%s-%s", channel.Namespace, channel.Name, b.id) } func (b *CloudPubSubBus) subscriptionID(subscription buses.SubscriptionReference) string { - return fmt.Sprintf("subscription-%s-%s-%s", b.ref.Name, subscription.Namespace, subscription.Name) + return fmt.Sprintf("subscription-%s-%s-%s", subscription.Namespace, subscription.Name, b.id) +} + +// busId generates a short unique identifier for a bus from the Bus's UID. +// This value is used as a component of the topic and subscription IDs +func busId(uid string) string { + h := sha1.New() + h.Write([]byte(uid)) + bs := h.Sum(nil) + return fmt.Sprintf("%x", bs)[0:5] } diff --git a/pkg/buses/gcppubsub/dispatcher/main.go b/pkg/buses/gcppubsub/dispatcher/main.go index 8718b170e81..22a1e3f0c4a 100644 --- a/pkg/buses/gcppubsub/dispatcher/main.go +++ b/pkg/buses/gcppubsub/dispatcher/main.go @@ -35,6 +35,7 @@ func main() { os.Getenv("BUS_NAME"), os.Getenv("BUS_NAMESPACE"), ) + busUID := os.Getenv("BUS_UID") config := buses.NewLoggingConfig() logger := buses.NewBusLoggerFromConfig(config) @@ -58,7 +59,7 @@ func main() { flag.StringVar(&opts.MasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.Parse() - bus, err := gcppubsub.NewCloudPubSubBusDispatcher(ref, projectID, opts) + bus, err := gcppubsub.NewCloudPubSubBusDispatcher(ref, busUID, projectID, opts) if err != nil { logger.Fatalf("Error starting pub/sub bus dispatcher: %v", err) } diff --git a/pkg/buses/gcppubsub/provisioner/main.go b/pkg/buses/gcppubsub/provisioner/main.go index e2a4fd87384..3be8081d0bf 100644 --- a/pkg/buses/gcppubsub/provisioner/main.go +++ b/pkg/buses/gcppubsub/provisioner/main.go @@ -35,6 +35,7 @@ func main() { os.Getenv("BUS_NAME"), os.Getenv("BUS_NAMESPACE"), ) + busUID := os.Getenv("BUS_UID") config := buses.NewLoggingConfig() logger := buses.NewBusLoggerFromConfig(config) @@ -51,13 +52,14 @@ func main() { } opts := &buses.BusOpts{ - Logger: logger} + Logger: logger, + } flag.StringVar(&opts.KubeConfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&opts.MasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.Parse() - bus, err := gcppubsub.NewCloudPubSubBusProvisioner(ref, projectID, opts) + bus, err := gcppubsub.NewCloudPubSubBusProvisioner(ref, busUID, projectID, opts) if err != nil { logger.Fatalf("Error starting pub/sub bus provisioner: %v", err) } diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index 88cae16d3b4..48097c6c6a2 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -634,6 +634,10 @@ func newDispatcherDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Name: "BUS_NAME", Value: bus.Name, }, + corev1.EnvVar{ + Name: "BUS_UID", + Value: string(bus.UID), + }, ) volumes := []corev1.Volume{} if bus.Spec.Volumes != nil { @@ -691,6 +695,10 @@ func newProvisionerDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Name: "BUS_NAME", Value: bus.Name, }, + corev1.EnvVar{ + Name: "BUS_UID", + Value: string(bus.UID), + }, ) volumes := []corev1.Volume{} if bus.Spec.Volumes != nil { diff --git a/pkg/controller/clusterbus/controller.go b/pkg/controller/clusterbus/controller.go index 8d0fe2cba34..22cbc81f8ec 100644 --- a/pkg/controller/clusterbus/controller.go +++ b/pkg/controller/clusterbus/controller.go @@ -556,6 +556,10 @@ func newDispatcherDeployment(clusterBus *channelsv1alpha1.ClusterBus) *appsv1.De Name: "BUS_NAME", Value: clusterBus.Name, }, + corev1.EnvVar{ + Name: "BUS_UID", + Value: string(clusterBus.UID), + }, ) volumes := []corev1.Volume{} if clusterBus.Spec.Volumes != nil { @@ -609,6 +613,10 @@ func newProvisionerDeployment(clusterBus *channelsv1alpha1.ClusterBus) *appsv1.D Name: "BUS_NAME", Value: clusterBus.Name, }, + corev1.EnvVar{ + Name: "BUS_UID", + Value: string(clusterBus.UID), + }, ) volumes := []corev1.Volume{} if clusterBus.Spec.Volumes != nil {