Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions pkg/buses/gcppubsub/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package gcppubsub

import (
"context"
"crypto/sha1"
"fmt"

"cloud.google.com/go/pubsub"
Expand All @@ -29,7 +30,7 @@ import (
const BusType = "gcppubsub"

type CloudPubSubBus struct {
ref buses.BusReference
id string
reconciler *buses.Reconciler
dispatcher buses.BusDispatcher
provisioner buses.BusProvisioner
Expand All @@ -40,15 +41,15 @@ type CloudPubSubBus struct {
logger *zap.SugaredLogger
}

func NewCloudPubSubBusDispatcher(ref buses.BusReference, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) {
func NewCloudPubSubBusDispatcher(ref buses.BusReference, busUID string, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) {
ctx := context.Background()
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, err
}

bus := &CloudPubSubBus{
ref: ref,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is ref in this context? It looks like it is being passed in but I would assume a ref to a bus would point to the object that is being created with &CloudPubSubBus{...}

id: busId(busUID),
pubsubClient: pubsubClient,
}
eventHandlers := buses.EventHandlerFuncs{
Expand All @@ -70,15 +71,15 @@ func NewCloudPubSubBusDispatcher(ref buses.BusReference, projectID string, opts
return bus, nil
}

func NewCloudPubSubBusProvisioner(ref buses.BusReference, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) {
func NewCloudPubSubBusProvisioner(ref buses.BusReference, busUID string, projectID string, opts *buses.BusOpts) (*CloudPubSubBus, error) {
ctx := context.Background()
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, err
}

bus := &CloudPubSubBus{
ref: ref,
id: busId(busUID),
Copy link
Copy Markdown
Contributor

@n3wscott n3wscott Sep 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than storing the id like this I wonder if it might be cleaner to set a topic prefix field that is generated from the id and our special strings (like channel- and then topicID becomes

func (b *CloudPubSubBus) topicID(channel buses.ChannelReference) string {
	return fmt.Sprintf("%s-%s-%s", b.topicPrefix, channel.Namespace, channel.Name)
}

And then we are free to change how we generate that prefix, and the code does not have to change each time.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we should store the generated UID in the status so we can recover it in the future.

Additionally, storing the topic ID means that we don't get thrown off later if we change the busID function.

Since this is the old Bus code, it's probably not worth updating the status to support this, but we should probably include a runtime.RawExtension in the status of the new Channel to support recording controller choices like this.

pubsubClient: pubsubClient,
}
eventHandlers := buses.EventHandlerFuncs{
Expand Down Expand Up @@ -284,9 +285,18 @@ func (b *CloudPubSubBus) deleteSubscription(ref buses.SubscriptionReference) err
}

func (b *CloudPubSubBus) topicID(channel buses.ChannelReference) string {
return fmt.Sprintf("channel-%s-%s-%s", b.ref.Name, channel.Namespace, channel.Name)
return fmt.Sprintf("channel-%s-%s-%s", channel.Namespace, channel.Name, b.id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it intended to flip the order of this? it use to be general bus to specific channel.

}

func (b *CloudPubSubBus) subscriptionID(subscription buses.SubscriptionReference) string {
return fmt.Sprintf("subscription-%s-%s-%s", b.ref.Name, subscription.Namespace, subscription.Name)
return fmt.Sprintf("subscription-%s-%s-%s", subscription.Namespace, subscription.Name, b.id)
}

// busId generates a short unique identifier for a bus from the Bus's UID.
// This value is used as a component of the topic and subscription IDs
func busId(uid string) string {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no test added for this method. Please add a test to make sure this does not make the same busID for similar UIDs

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In go style, busId should be busID

h := sha1.New()
h.Write([]byte(uid))
bs := h.Sum(nil)
return fmt.Sprintf("%x", bs)[0:5]
}
3 changes: 2 additions & 1 deletion pkg/buses/gcppubsub/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func main() {
os.Getenv("BUS_NAME"),
os.Getenv("BUS_NAMESPACE"),
)
busUID := os.Getenv("BUS_UID")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if there is no BUS_UID?


config := buses.NewLoggingConfig()
logger := buses.NewBusLoggerFromConfig(config)
Expand All @@ -58,7 +59,7 @@ func main() {
flag.StringVar(&opts.MasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.Parse()

bus, err := gcppubsub.NewCloudPubSubBusDispatcher(ref, projectID, opts)
bus, err := gcppubsub.NewCloudPubSubBusDispatcher(ref, busUID, projectID, opts)
if err != nil {
logger.Fatalf("Error starting pub/sub bus dispatcher: %v", err)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/buses/gcppubsub/provisioner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func main() {
os.Getenv("BUS_NAME"),
os.Getenv("BUS_NAMESPACE"),
)
busUID := os.Getenv("BUS_UID")

config := buses.NewLoggingConfig()
logger := buses.NewBusLoggerFromConfig(config)
Expand All @@ -51,13 +52,14 @@ func main() {
}

opts := &buses.BusOpts{
Logger: logger}
Logger: logger,
}

flag.StringVar(&opts.KubeConfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&opts.MasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.Parse()

bus, err := gcppubsub.NewCloudPubSubBusProvisioner(ref, projectID, opts)
bus, err := gcppubsub.NewCloudPubSubBusProvisioner(ref, busUID, projectID, opts)
if err != nil {
logger.Fatalf("Error starting pub/sub bus provisioner: %v", err)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/bus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,10 @@ func newDispatcherDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment {
Name: "BUS_NAME",
Value: bus.Name,
},
corev1.EnvVar{
Name: "BUS_UID",
Value: string(bus.UID),
},
)
volumes := []corev1.Volume{}
if bus.Spec.Volumes != nil {
Expand Down Expand Up @@ -691,6 +695,10 @@ func newProvisionerDeployment(bus *channelsv1alpha1.Bus) *appsv1.Deployment {
Name: "BUS_NAME",
Value: bus.Name,
},
corev1.EnvVar{
Name: "BUS_UID",
Value: string(bus.UID),
},
)
volumes := []corev1.Volume{}
if bus.Spec.Volumes != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/clusterbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ func newDispatcherDeployment(clusterBus *channelsv1alpha1.ClusterBus) *appsv1.De
Name: "BUS_NAME",
Value: clusterBus.Name,
},
corev1.EnvVar{
Name: "BUS_UID",
Value: string(clusterBus.UID),
},
)
volumes := []corev1.Volume{}
if clusterBus.Spec.Volumes != nil {
Expand Down Expand Up @@ -609,6 +613,10 @@ func newProvisionerDeployment(clusterBus *channelsv1alpha1.ClusterBus) *appsv1.D
Name: "BUS_NAME",
Value: clusterBus.Name,
},
corev1.EnvVar{
Name: "BUS_UID",
Value: string(clusterBus.UID),
},
)
volumes := []corev1.Volume{}
if clusterBus.Spec.Volumes != nil {
Expand Down