diff --git a/pkg/buses/gcppubsub/bus.go b/pkg/buses/gcppubsub/bus.go index d8385b0e19d..d42be717163 100644 --- a/pkg/buses/gcppubsub/bus.go +++ b/pkg/buses/gcppubsub/bus.go @@ -18,6 +18,7 @@ package gcppubsub import ( "context" + "crypto/sha1" "fmt" "cloud.google.com/go/pubsub" @@ -29,7 +30,7 @@ import ( const BusType = "gcppubsub" type CloudPubSubBus struct { - ref buses.BusReference + id string reconciler *buses.Reconciler dispatcher buses.BusDispatcher provisioner buses.BusProvisioner @@ -40,7 +41,7 @@ type CloudPubSubBus struct { logger *zap.SugaredLogger } -func NewCloudPubSubBusDispatcher(ref buses.BusReference, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { +func NewCloudPubSubBusDispatcher(ref buses.BusReference, busUID string, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { ctx := context.Background() pubsubClient, err := pubsub.NewClient(ctx, projectID) if err != nil { @@ -48,7 +49,7 @@ func NewCloudPubSubBusDispatcher(ref buses.BusReference, projectID string, opts } bus := &CloudPubSubBus{ - ref: ref, + id: busId(busUID), pubsubClient: pubsubClient, } eventHandlers := buses.EventHandlerFuncs{ @@ -70,7 +71,7 @@ func NewCloudPubSubBusDispatcher(ref buses.BusReference, projectID string, opts return bus, nil } -func NewCloudPubSubBusProvisioner(ref buses.BusReference, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { +func NewCloudPubSubBusProvisioner(ref buses.BusReference, busUID string, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) { ctx := context.Background() pubsubClient, err := pubsub.NewClient(ctx, projectID) if err != nil { @@ -78,7 +79,7 @@ func NewCloudPubSubBusProvisioner(ref buses.BusReference, projectID string, opts } bus := &CloudPubSubBus{ - ref: ref, + id: busId(busUID), pubsubClient: pubsubClient, } eventHandlers := buses.EventHandlerFuncs{ @@ -284,9 +285,18 @@ func (b *CloudPubSubBus) deleteSubscription(ref buses.SubscriptionReference) err } func (b *CloudPubSubBus) topicID(channel buses.ChannelReference) string { - return fmt.Sprintf("channel-%s-%s-%s", b.ref.Name, channel.Namespace, channel.Name) + return fmt.Sprintf("channel-%s-%s-%s", channel.Namespace, channel.Name, b.id) } func (b *CloudPubSubBus) subscriptionID(subscription buses.SubscriptionReference) string { - return fmt.Sprintf("subscription-%s-%s-%s", b.ref.Name, subscription.Namespace, subscription.Name) + return fmt.Sprintf("subscription-%s-%s-%s", subscription.Namespace, subscription.Name, b.id) +} + +// busId generates a short unique identifier for a bus from the Bus's UID. +// This value is used as a component of the topic and subscription IDs +func busId(uid string) string { + h := sha1.New() + h.Write([]byte(uid)) + bs := h.Sum(nil) + return fmt.Sprintf("%x", bs)[0:5] } diff --git a/pkg/buses/gcppubsub/dispatcher/main.go b/pkg/buses/gcppubsub/dispatcher/main.go index 8718b170e81..22a1e3f0c4a 100644 --- a/pkg/buses/gcppubsub/dispatcher/main.go +++ b/pkg/buses/gcppubsub/dispatcher/main.go @@ -35,6 +35,7 @@ func main() { os.Getenv("BUS_NAME"), os.Getenv("BUS_NAMESPACE"), ) + busUID := os.Getenv("BUS_UID") config := buses.NewLoggingConfig() logger := buses.NewBusLoggerFromConfig(config) @@ -58,7 +59,7 @@ func main() { flag.StringVar(&opts.MasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.Parse() - bus, err := gcppubsub.NewCloudPubSubBusDispatcher(ref, projectID, opts) + bus, err := gcppubsub.NewCloudPubSubBusDispatcher(ref, busUID, projectID, opts) if err != nil { logger.Fatalf("Error starting pub/sub bus dispatcher: %v", err) } diff --git a/pkg/buses/gcppubsub/provisioner/main.go b/pkg/buses/gcppubsub/provisioner/main.go index e2a4fd87384..3be8081d0bf 100644 --- a/pkg/buses/gcppubsub/provisioner/main.go +++ b/pkg/buses/gcppubsub/provisioner/main.go @@ -35,6 +35,7 @@ func main() { os.Getenv("BUS_NAME"), os.Getenv("BUS_NAMESPACE"), ) + busUID := os.Getenv("BUS_UID") config := buses.NewLoggingConfig() logger := buses.NewBusLoggerFromConfig(config) @@ -51,13 +52,14 @@ func main() { } opts := &buses.BusOpts{ - Logger: logger} + Logger: logger, + } flag.StringVar(&opts.KubeConfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&opts.MasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.Parse() - bus, err := gcppubsub.NewCloudPubSubBusProvisioner(ref, projectID, opts) + bus, err := gcppubsub.NewCloudPubSubBusProvisioner(ref, busUID, projectID, opts) if err != nil { logger.Fatalf("Error starting pub/sub bus provisioner: %v", err) } diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index 88cae16d3b4..48097c6c6a2 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -634,6 +634,10 @@ func newDispatcherDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Name: "BUS_NAME", Value: bus.Name, }, + corev1.EnvVar{ + Name: "BUS_UID", + Value: string(bus.UID), + }, ) volumes := []corev1.Volume{} if bus.Spec.Volumes != nil { @@ -691,6 +695,10 @@ func newProvisionerDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment { Name: "BUS_NAME", Value: bus.Name, }, + corev1.EnvVar{ + Name: "BUS_UID", + Value: string(bus.UID), + }, ) volumes := []corev1.Volume{} if bus.Spec.Volumes != nil { diff --git a/pkg/controller/clusterbus/controller.go b/pkg/controller/clusterbus/controller.go index 8d0fe2cba34..22cbc81f8ec 100644 --- a/pkg/controller/clusterbus/controller.go +++ b/pkg/controller/clusterbus/controller.go @@ -556,6 +556,10 @@ func newDispatcherDeployment(clusterBus *channelsv1alpha1.ClusterBus) *appsv1.De Name: "BUS_NAME", Value: clusterBus.Name, }, + corev1.EnvVar{ + Name: "BUS_UID", + Value: string(clusterBus.UID), + }, ) volumes := []corev1.Volume{} if clusterBus.Spec.Volumes != nil { @@ -609,6 +613,10 @@ func newProvisionerDeployment(clusterBus *channelsv1alpha1.ClusterBus) *appsv1.D Name: "BUS_NAME", Value: clusterBus.Name, }, + corev1.EnvVar{ + Name: "BUS_UID", + Value: string(clusterBus.UID), + }, ) volumes := []corev1.Volume{} if clusterBus.Spec.Volumes != nil {