Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions tavern/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,23 +149,18 @@ func (cfg *Config) NewShellMuxes(ctx context.Context) (wsMux *stream.Mux, grpcMu
subShellOutput = EnvPubSubSubscriptionShellOutput.String()
)

pubOutput, err := pubsub.OpenTopic(ctx, topicShellOutput)
if err != nil {
log.Fatalf("[FATAL] Failed to connect to pubsub topic (%q): %v", topicShellOutput, err)
}

// For GCP, messages for a "Subscription" are load-balanced across all of the "Subscribers" to that same "Subscription"
// This means we must make a new "Subscription" in GCP for each instance of tavern to ensure they all receive the
// appropriate input/output from shells. For more information, see the information here:
// https://cloud.google.com/pubsub/docs/pubsub-basics#choose_a_publish_and_subscribe_pattern
if strings.HasPrefix(subShellInput, "gcppubsub://") && strings.HasPrefix(subShellOutput, "gcppubsub://") {
if projectID == "" {
log.Fatalf("must set value for %q when using gcppubsub:// in configuration", EnvGCPProjectID.Key)
log.Fatalf("[FATAL] must set value for %q when using gcppubsub:// in configuration", EnvGCPProjectID.Key)
}

client, err := gcppubsub.NewClient(ctx, projectID)
if err != nil {
panic(fmt.Errorf("failed to create gcppubsub client needed to create a new subscription"))
panic(fmt.Errorf("failed to create gcppubsub client needed to create a new subscription: %v", err))
}
defer client.Close()

Expand All @@ -178,7 +173,7 @@ func (cfg *Config) NewShellMuxes(ctx context.Context) (wsMux *stream.Mux, grpcMu
ExpirationPolicy: 24 * time.Hour, // Automatically delete unused subscriptions after 1 day
})
if err != nil {
panic(fmt.Errorf("failed to create gcppubsub subscription, to disable creation do not use the 'gcppubsub://' prefix for the environment variable %q: %v", EnvPubSubSubscriptionShellInput.Key, err))
panic(fmt.Errorf("failed to create gcppubsub subscription (topic=%q), to disable creation do not use the 'gcppubsub://' prefix for the environment variable %q: %v", topic.ID(), EnvPubSubSubscriptionShellInput.Key, err))
}
exists, err := sub.Exists(ctx)
if err != nil {
Expand All @@ -190,14 +185,19 @@ func (cfg *Config) NewShellMuxes(ctx context.Context) (wsMux *stream.Mux, grpcMu
return name
}

shellInputTopic := client.Topic(strings.TrimPrefix(EnvPubSubTopicShellInput.String(), "gcppubsub://"))
shellOutputTopic := client.Topic(strings.TrimPrefix(EnvPubSubTopicShellInput.String(), "gcppubsub://"))
shellInputTopic := client.Topic(strings.TrimPrefix(topicShellInput, "gcppubsub://"))
shellOutputTopic := client.Topic(strings.TrimPrefix(topicShellOutput, "gcppubsub://"))

// Overwrite env var specification with newly created GCP PubSub Subscriptions
subShellInput = fmt.Sprintf("gcpubsub://%s", createGCPSubscription(ctx, EnvPubSubSubscriptionShellInput, shellInputTopic))
subShellOutput = fmt.Sprintf("gcpubsub://%s", createGCPSubscription(ctx, EnvPubSubSubscriptionShellOutput, shellOutputTopic))
}

pubOutput, err := pubsub.OpenTopic(ctx, topicShellOutput)
if err != nil {
log.Fatalf("[FATAL] Failed to connect to pubsub topic (%q): %v", topicShellOutput, err)
}

subOutput, err := pubsub.OpenSubscription(ctx, subShellOutput)
if err != nil {
log.Fatalf("[FATAL] Failed to connect to pubsub subscription (%q): %v", subShellOutput, err)
Expand Down