From d68b119d7b7d494ac6eeab2c2aadeda07de4bfeb Mon Sep 17 00:00:00 2001 From: Parag Bhingre Date: Mon, 3 Oct 2022 16:52:04 -0700 Subject: [PATCH 1/7] fix(FIFO): worker service manifest with fifo subscriptions --- internal/pkg/initialize/workload.go | 2 + internal/pkg/initialize/workload_test.go | 48 +++++++++++++++++++ internal/pkg/manifest/worker_svc.go | 10 ++++ .../workloads/services/worker/manifest.yml | 4 ++ 4 files changed, 64 insertions(+) diff --git a/internal/pkg/initialize/workload.go b/internal/pkg/initialize/workload.go index 84bc50442c0..2ef4c6d822c 100644 --- a/internal/pkg/initialize/workload.go +++ b/internal/pkg/initialize/workload.go @@ -67,6 +67,7 @@ type WorkloadProps struct { Image string Platform manifest.PlatformArgsOrString Topics []manifest.TopicSubscription + Queue manifest.SQSQueue } // JobProps contains the information needed to represent a Job. @@ -370,6 +371,7 @@ func newWorkerServiceManifest(i *ServiceProps) (*manifest.WorkerService, error) HealthCheck: i.HealthCheck, Platform: i.Platform, Topics: i.Topics, + Queue: i.Queue, }), nil } diff --git a/internal/pkg/initialize/workload_test.go b/internal/pkg/initialize/workload_test.go index 307eff4c6cf..1b4ffb00ed1 100644 --- a/internal/pkg/initialize/workload_test.go +++ b/internal/pkg/initialize/workload_test.go @@ -735,6 +735,54 @@ func TestWorkloadInitializer_Service(t *testing.T) { }, "worker") }, }, + "topic subscriptions enabled with default fifo queue": { + inSvcType: manifest.WorkerServiceType, + inAppName: "app", + inSvcName: "worker", + inDockerfilePath: "worker/Dockerfile", + inSvcPort: 80, + inTopics: []manifest.TopicSubscription{ + { + Name: aws.String("theTopic.fifo"), + Service: aws.String("publisher"), + }, + }, + + mockWriter: func(m *mocks.MockWorkspace) { + // workspace root: "/worker" + gomock.InOrder( + m.EXPECT().Rel("worker/Dockerfile").Return("Dockerfile", nil), + m.EXPECT().Rel("/worker/manifest.yml").Return("manifest.yml", nil)) + m.EXPECT().WriteServiceManifest(gomock.Any(), "worker"). + Do(func(m *manifest.WorkerService, _ string) { + require.Equal(t, *m.Workload.Type, manifest.WorkerServiceType) + require.Equal(t, *m.Subscribe.Queue.FIFO.Enable, true) + require.Empty(t, m.ImageConfig.HealthCheck) + }).Return("/worker/manifest.yml", nil) + }, + mockstore: func(m *mocks.MockStore) { + m.EXPECT().CreateService(gomock.Any()). + Do(func(app *config.Workload) { + require.Equal(t, &config.Workload{ + Name: "worker", + App: "app", + Type: manifest.WorkerServiceType, + }, app) + }). + Return(nil) + + m.EXPECT().GetApplication("app").Return(&config.Application{ + Name: "app", + AccountID: "1234", + }, nil) + }, + mockappDeployer: func(m *mocks.MockWorkloadAdder) { + m.EXPECT().AddServiceToApp(&config.Application{ + Name: "app", + AccountID: "1234", + }, "worker") + }, + }, } for name, tc := range testCases { diff --git a/internal/pkg/manifest/worker_svc.go b/internal/pkg/manifest/worker_svc.go index 1d4f172ec37..090137077a7 100644 --- a/internal/pkg/manifest/worker_svc.go +++ b/internal/pkg/manifest/worker_svc.go @@ -5,6 +5,7 @@ package manifest import ( "errors" + "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -190,6 +191,7 @@ type WorkerServiceProps struct { HealthCheck ContainerHealthCheck // Optional healthcheck configuration. Platform PlatformArgsOrString // Optional platform configuration. Topics []TopicSubscription // Optional topics for subscriptions + Queue SQSQueue // Optional queue configuration. } // NewWorkerService applies the props to a default Worker service configuration with @@ -206,6 +208,14 @@ func NewWorkerService(props WorkerServiceProps) *WorkerService { svc.WorkerServiceConfig.TaskConfig.CPU = aws.Int(MinWindowsTaskCPU) svc.WorkerServiceConfig.TaskConfig.Memory = aws.Int(MinWindowsTaskMemory) } + if len(props.Topics) > 0 { + for idx, topic := range props.Topics { + if strings.Contains(aws.StringValue(topic.Name), ".fifo") { + props.Topics[idx].Name = aws.String(strings.TrimSuffix(aws.StringValue(topic.Name), ".fifo")) + svc.WorkerServiceConfig.Subscribe.Queue.FIFO.Enable = aws.Bool(true) + } + } + } svc.WorkerServiceConfig.Subscribe.Topics = props.Topics svc.WorkerServiceConfig.Platform = props.Platform svc.parser = template.New() diff --git a/internal/pkg/template/templates/workloads/services/worker/manifest.yml b/internal/pkg/template/templates/workloads/services/worker/manifest.yml index 7663c1d16aa..3b1c7630c73 100644 --- a/internal/pkg/template/templates/workloads/services/worker/manifest.yml +++ b/internal/pkg/template/templates/workloads/services/worker/manifest.yml @@ -42,6 +42,10 @@ subscribe: - name: {{$topic.Name}} service: {{$topic.Service}} {{- end}} + {{- if and .Subscribe.Queue .Subscribe.Queue.FIFO .Subscribe.Queue.FIFO.Enable }} + queue: + fifo: true + {{- end }} {{- else}} # You can register to topics from other services. # The events can be be received from an SQS queue via the env var $COPILOT_QUEUE_URI. From 45ca6b2fc9dd18bd65307260a7b57a6e2ab161a4 Mon Sep 17 00:00:00 2001 From: Parag Bhingre Date: Tue, 4 Oct 2022 12:30:02 -0700 Subject: [PATCH 2/7] address feedback --- .../initial_manifest_integration_test.go | 22 +++++++++++++++++++ internal/pkg/manifest/worker_svc.go | 18 ++++++++++----- .../workloads/services/worker/manifest.yml | 2 +- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/internal/pkg/manifest/initial_manifest_integration_test.go b/internal/pkg/manifest/initial_manifest_integration_test.go index 8a0e299fd2a..62163c058cc 100644 --- a/internal/pkg/manifest/initial_manifest_integration_test.go +++ b/internal/pkg/manifest/initial_manifest_integration_test.go @@ -161,6 +161,28 @@ func TestWorkerSvc_InitialManifestIntegration(t *testing.T) { }, wantedTestdata: "worker-svc-subscribe.yml", }, + "with fifo topic subscription with default fifo queue": { + inProps: WorkerServiceProps{ + WorkloadProps: WorkloadProps{ + Name: "testers", + Dockerfile: "./testers/Dockerfile", + }, + Platform: PlatformArgsOrString{ + PlatformString: nil, + PlatformArgs: PlatformArgs{ + OSFamily: nil, + Arch: nil, + }, + }, + Topics: []TopicSubscription{ + { + Name: aws.String("testTopic.fifo"), + Service: aws.String("service4TestTopic"), + }, + }, + }, + wantedTestdata: "worker-svc-with-default-fifo-queue.yml", + }, } for name, tc := range testCases { diff --git a/internal/pkg/manifest/worker_svc.go b/internal/pkg/manifest/worker_svc.go index 090137077a7..079d552d9e6 100644 --- a/internal/pkg/manifest/worker_svc.go +++ b/internal/pkg/manifest/worker_svc.go @@ -209,11 +209,8 @@ func NewWorkerService(props WorkerServiceProps) *WorkerService { svc.WorkerServiceConfig.TaskConfig.Memory = aws.Int(MinWindowsTaskMemory) } if len(props.Topics) > 0 { - for idx, topic := range props.Topics { - if strings.Contains(aws.StringValue(topic.Name), ".fifo") { - props.Topics[idx].Name = aws.String(strings.TrimSuffix(aws.StringValue(topic.Name), ".fifo")) - svc.WorkerServiceConfig.Subscribe.Queue.FIFO.Enable = aws.Bool(true) - } + if aws.BoolValue(fifoEnabled(props.Topics)) { + svc.WorkerServiceConfig.Subscribe.Queue.FIFO.Enable = aws.Bool(true) } } svc.WorkerServiceConfig.Subscribe.Topics = props.Topics @@ -222,6 +219,17 @@ func NewWorkerService(props WorkerServiceProps) *WorkerService { return svc } +func fifoEnabled(topics []TopicSubscription) *bool { + var isFIFOEnabled bool + for idx, topic := range topics { + if strings.HasSuffix(aws.StringValue(topic.Name), ".fifo") { + topics[idx].Name = aws.String(strings.TrimSuffix(aws.StringValue(topic.Name), ".fifo")) + isFIFOEnabled = true + } + } + return &isFIFOEnabled +} + // MarshalBinary serializes the manifest object into a binary YAML document. // Implements the encoding.BinaryMarshaler interface. func (s *WorkerService) MarshalBinary() ([]byte, error) { diff --git a/internal/pkg/template/templates/workloads/services/worker/manifest.yml b/internal/pkg/template/templates/workloads/services/worker/manifest.yml index 3b1c7630c73..58f7086a99f 100644 --- a/internal/pkg/template/templates/workloads/services/worker/manifest.yml +++ b/internal/pkg/template/templates/workloads/services/worker/manifest.yml @@ -44,7 +44,7 @@ subscribe: {{- end}} {{- if and .Subscribe.Queue .Subscribe.Queue.FIFO .Subscribe.Queue.FIFO.Enable }} queue: - fifo: true + fifo: {{ .Subscribe.Queue.FIFO.Enable }} {{- end }} {{- else}} # You can register to topics from other services. From db9fb8f6b1af01622f194a69dacc4a85c2e48314 Mon Sep 17 00:00:00 2001 From: Parag Bhingre Date: Tue, 4 Oct 2022 12:35:40 -0700 Subject: [PATCH 3/7] add manifest test file --- .../worker-svc-with-default-fifo-queue.yml | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 internal/pkg/manifest/testdata/worker-svc-with-default-fifo-queue.yml diff --git a/internal/pkg/manifest/testdata/worker-svc-with-default-fifo-queue.yml b/internal/pkg/manifest/testdata/worker-svc-with-default-fifo-queue.yml new file mode 100644 index 00000000000..00e1da5e041 --- /dev/null +++ b/internal/pkg/manifest/testdata/worker-svc-with-default-fifo-queue.yml @@ -0,0 +1,40 @@ +# The manifest for the "testers" service. +# Read the full specification for the "Worker Service" type at: +# https://aws.github.io/copilot-cli/docs/manifest/worker-service/ + +# Your service name will be used in naming your resources like log groups, ECS services, etc. +name: testers +type: Worker Service + +# Configuration for your containers and service. +image: + # Docker build arguments. + build: ./testers/Dockerfile + +cpu: 256 # Number of CPU units for the task. +memory: 512 # Amount of memory in MiB used by the task. +count: 1 # Number of tasks that should be running in your service. +exec: true # Enable running commands in your container. + +# The events can be be received from an SQS queue via the env var $COPILOT_QUEUE_URI. +subscribe: + topics: + - name: testTopic + service: service4TestTopic + queue: + fifo: true + +# Optional fields for more advanced use-cases. +# +#variables: # Pass environment variables as key value pairs. +# LOG_LEVEL: info + +#secrets: # Pass secrets from AWS Systems Manager (SSM) Parameter Store. +# GITHUB_TOKEN: GITHUB_TOKEN # The key is the name of the environment variable, the value is the name of the SSM parameter. + +# You can override any of the values defined above by environment. +#environments: +# test: +# count: 2 # Number of tasks to run for the "test" environment. +# deployment: # The deployment strategy for the "test" environment. +# rolling: 'recreate' # Stops existing tasks before new ones are started for faster deployments. From fd49c87dd8066205e04002a2819d1e9a37efb9ec Mon Sep 17 00:00:00 2001 From: Parag Bhingre Date: Fri, 7 Oct 2022 11:10:53 -0700 Subject: [PATCH 4/7] add topic specific queue when required --- internal/pkg/manifest/worker_svc.go | 21 +- internal/pkg/manifest/worker_svc_test.go | 217 ++++++++++++++++++ .../workloads/services/worker/manifest.yml | 3 + 3 files changed, 236 insertions(+), 5 deletions(-) diff --git a/internal/pkg/manifest/worker_svc.go b/internal/pkg/manifest/worker_svc.go index 079d552d9e6..20a64381c5c 100644 --- a/internal/pkg/manifest/worker_svc.go +++ b/internal/pkg/manifest/worker_svc.go @@ -209,7 +209,7 @@ func NewWorkerService(props WorkerServiceProps) *WorkerService { svc.WorkerServiceConfig.TaskConfig.Memory = aws.Int(MinWindowsTaskMemory) } if len(props.Topics) > 0 { - if aws.BoolValue(fifoEnabled(props.Topics)) { + if aws.BoolValue(FIFOEnabled(props.Topics)) { svc.WorkerServiceConfig.Subscribe.Queue.FIFO.Enable = aws.Bool(true) } } @@ -219,15 +219,26 @@ func NewWorkerService(props WorkerServiceProps) *WorkerService { return svc } -func fifoEnabled(topics []TopicSubscription) *bool { +func FIFOEnabled(topics []TopicSubscription) *bool { var isFIFOEnabled bool - for idx, topic := range topics { + for _, topic := range topics { if strings.HasSuffix(aws.StringValue(topic.Name), ".fifo") { - topics[idx].Name = aws.String(strings.TrimSuffix(aws.StringValue(topic.Name), ".fifo")) isFIFOEnabled = true + break + } + } + if !isFIFOEnabled { + return aws.Bool(isFIFOEnabled) + } else { + for idx, topic := range topics { + if strings.HasSuffix(aws.StringValue(topic.Name), ".fifo") { + topics[idx].Name = aws.String(strings.TrimSuffix(aws.StringValue(topic.Name), ".fifo")) + } else { + topics[idx].Queue.Enabled = aws.Bool(true) + } } } - return &isFIFOEnabled + return aws.Bool(isFIFOEnabled) } // MarshalBinary serializes the manifest object into a binary YAML document. diff --git a/internal/pkg/manifest/worker_svc_test.go b/internal/pkg/manifest/worker_svc_test.go index 9edbc38779c..110dd4cf198 100644 --- a/internal/pkg/manifest/worker_svc_test.go +++ b/internal/pkg/manifest/worker_svc_test.go @@ -142,6 +142,223 @@ func TestNewWorkerSvc(t *testing.T) { }, }, }, + "should return a worker service instance with 2 subscriptions to the default fifo queue and 2 standard topic specific queues": { + inProps: WorkerServiceProps{ + WorkloadProps: WorkloadProps{ + Name: "testers", + Dockerfile: "./testers/Dockerfile", + }, + Topics: []TopicSubscription{ + { + Name: aws.String("fifoTopic1.fifo"), + Service: aws.String("fifoService1"), + }, + { + Name: aws.String("fifoTopic2.fifo"), + Service: aws.String("fifoService2"), + }, + { + Name: aws.String("standardTopic1"), + Service: aws.String("standardService1"), + }, + { + Name: aws.String("standardTopic2"), + Service: aws.String("standardService2"), + }, + }, + }, + wantedManifest: &WorkerService{ + Workload: Workload{ + Name: aws.String("testers"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + ImageConfig: ImageWithHealthcheck{ + Image: Image{ + Build: BuildArgsOrString{ + BuildArgs: DockerBuildArgs{ + Dockerfile: aws.String("./testers/Dockerfile"), + }, + }, + }, + }, + Subscribe: SubscribeConfig{ + Topics: []TopicSubscription{ + { + Name: aws.String("fifoTopic1"), + Service: aws.String("fifoService1"), + }, + { + Name: aws.String("fifoTopic2"), + Service: aws.String("fifoService2"), + }, + { + Name: aws.String("standardTopic1"), + Service: aws.String("standardService1"), + Queue: SQSQueueOrBool{Enabled: aws.Bool(true)}, + }, + { + Name: aws.String("standardTopic2"), + Service: aws.String("standardService2"), + Queue: SQSQueueOrBool{Enabled: aws.Bool(true)}, + }, + }, + Queue: SQSQueue{ + FIFO: FIFOAdvanceConfigOrBool{Enable: aws.Bool(true)}, + }, + }, + TaskConfig: TaskConfig{ + CPU: aws.Int(256), + Memory: aws.Int(512), + Count: Count{ + Value: aws.Int(1), + }, + ExecuteCommand: ExecuteCommand{ + Enable: aws.Bool(false), + }, + }, + Network: NetworkConfig{ + VPC: vpcConfig{ + Placement: PlacementArgOrString{ + PlacementString: placementStringP(PublicSubnetPlacement), + }, + }, + }, + }, + }, + }, + "should return a worker service instance with 2 subscriptions to the default fifo queue": { + inProps: WorkerServiceProps{ + WorkloadProps: WorkloadProps{ + Name: "testers", + Dockerfile: "./testers/Dockerfile", + }, + Topics: []TopicSubscription{ + { + Name: aws.String("fifoTopic1.fifo"), + Service: aws.String("fifoService1"), + }, + { + Name: aws.String("fifoTopic2.fifo"), + Service: aws.String("fifoService2"), + }, + }, + }, + wantedManifest: &WorkerService{ + Workload: Workload{ + Name: aws.String("testers"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + ImageConfig: ImageWithHealthcheck{ + Image: Image{ + Build: BuildArgsOrString{ + BuildArgs: DockerBuildArgs{ + Dockerfile: aws.String("./testers/Dockerfile"), + }, + }, + }, + }, + Subscribe: SubscribeConfig{ + Topics: []TopicSubscription{ + { + Name: aws.String("fifoTopic1"), + Service: aws.String("fifoService1"), + }, + { + Name: aws.String("fifoTopic2"), + Service: aws.String("fifoService2"), + }, + }, + Queue: SQSQueue{ + FIFO: FIFOAdvanceConfigOrBool{Enable: aws.Bool(true)}, + }, + }, + TaskConfig: TaskConfig{ + CPU: aws.Int(256), + Memory: aws.Int(512), + Count: Count{ + Value: aws.Int(1), + }, + ExecuteCommand: ExecuteCommand{ + Enable: aws.Bool(false), + }, + }, + Network: NetworkConfig{ + VPC: vpcConfig{ + Placement: PlacementArgOrString{ + PlacementString: placementStringP(PublicSubnetPlacement), + }, + }, + }, + }, + }, + }, + "should return a worker service instance with 2 subscriptions to the default standard queue": { + inProps: WorkerServiceProps{ + WorkloadProps: WorkloadProps{ + Name: "testers", + Dockerfile: "./testers/Dockerfile", + }, + Topics: []TopicSubscription{ + { + Name: aws.String("standardTopic1"), + Service: aws.String("standardService1"), + }, + { + Name: aws.String("standardTopic2"), + Service: aws.String("standardService2"), + }, + }, + }, + wantedManifest: &WorkerService{ + Workload: Workload{ + Name: aws.String("testers"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + ImageConfig: ImageWithHealthcheck{ + Image: Image{ + Build: BuildArgsOrString{ + BuildArgs: DockerBuildArgs{ + Dockerfile: aws.String("./testers/Dockerfile"), + }, + }, + }, + }, + Subscribe: SubscribeConfig{ + Topics: []TopicSubscription{ + { + Name: aws.String("standardTopic1"), + Service: aws.String("standardService1"), + }, + { + Name: aws.String("standardTopic2"), + Service: aws.String("standardService2"), + }, + }, + Queue: SQSQueue{}, + }, + TaskConfig: TaskConfig{ + CPU: aws.Int(256), + Memory: aws.Int(512), + Count: Count{ + Value: aws.Int(1), + }, + ExecuteCommand: ExecuteCommand{ + Enable: aws.Bool(false), + }, + }, + Network: NetworkConfig{ + VPC: vpcConfig{ + Placement: PlacementArgOrString{ + PlacementString: placementStringP(PublicSubnetPlacement), + }, + }, + }, + }, + }, + }, } for name, tc := range testCases { diff --git a/internal/pkg/template/templates/workloads/services/worker/manifest.yml b/internal/pkg/template/templates/workloads/services/worker/manifest.yml index 58f7086a99f..469e8058fa2 100644 --- a/internal/pkg/template/templates/workloads/services/worker/manifest.yml +++ b/internal/pkg/template/templates/workloads/services/worker/manifest.yml @@ -41,6 +41,9 @@ subscribe: {{- range $topic := .Subscribe.Topics}} - name: {{$topic.Name}} service: {{$topic.Service}} + {{- if $topic.Queue.Enabled }} + queue: {{ $topic.Queue.Enabled }} + {{- end }} {{- end}} {{- if and .Subscribe.Queue .Subscribe.Queue.FIFO .Subscribe.Queue.FIFO.Enable }} queue: From c323d47312f051241642274b7bd359444d36f4d0 Mon Sep 17 00:00:00 2001 From: Parag Bhingre Date: Fri, 7 Oct 2022 15:41:55 -0700 Subject: [PATCH 5/7] address feedback --- internal/pkg/manifest/worker_svc.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/internal/pkg/manifest/worker_svc.go b/internal/pkg/manifest/worker_svc.go index 20a64381c5c..8e292dd0801 100644 --- a/internal/pkg/manifest/worker_svc.go +++ b/internal/pkg/manifest/worker_svc.go @@ -209,9 +209,7 @@ func NewWorkerService(props WorkerServiceProps) *WorkerService { svc.WorkerServiceConfig.TaskConfig.Memory = aws.Int(MinWindowsTaskMemory) } if len(props.Topics) > 0 { - if aws.BoolValue(FIFOEnabled(props.Topics)) { - svc.WorkerServiceConfig.Subscribe.Queue.FIFO.Enable = aws.Bool(true) - } + setSubscriptionQueueDefaults(props.Topics, &svc.WorkerServiceConfig.Subscribe.Queue) } svc.WorkerServiceConfig.Subscribe.Topics = props.Topics svc.WorkerServiceConfig.Platform = props.Platform @@ -219,7 +217,7 @@ func NewWorkerService(props WorkerServiceProps) *WorkerService { return svc } -func FIFOEnabled(topics []TopicSubscription) *bool { +func setSubscriptionQueueDefaults(topics []TopicSubscription, eventsQueue *SQSQueue) { var isFIFOEnabled bool for _, topic := range topics { if strings.HasSuffix(aws.StringValue(topic.Name), ".fifo") { @@ -228,17 +226,16 @@ func FIFOEnabled(topics []TopicSubscription) *bool { } } if !isFIFOEnabled { - return aws.Bool(isFIFOEnabled) - } else { - for idx, topic := range topics { - if strings.HasSuffix(aws.StringValue(topic.Name), ".fifo") { - topics[idx].Name = aws.String(strings.TrimSuffix(aws.StringValue(topic.Name), ".fifo")) - } else { - topics[idx].Queue.Enabled = aws.Bool(true) - } + return + } + eventsQueue.FIFO.Enable = aws.Bool(true) + for idx, topic := range topics { + if strings.HasSuffix(aws.StringValue(topic.Name), ".fifo") { + topics[idx].Name = aws.String(strings.TrimSuffix(aws.StringValue(topic.Name), ".fifo")) + } else { + topics[idx].Queue.Enabled = aws.Bool(true) } } - return aws.Bool(isFIFOEnabled) } // MarshalBinary serializes the manifest object into a binary YAML document. From 72595320876c1d0a8f5e02fae75532ccdaee43b5 Mon Sep 17 00:00:00 2001 From: Parag Bhingre Date: Mon, 10 Oct 2022 12:50:57 -0700 Subject: [PATCH 6/7] nit --- .../template/templates/workloads/services/worker/manifest.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/template/templates/workloads/services/worker/manifest.yml b/internal/pkg/template/templates/workloads/services/worker/manifest.yml index 469e8058fa2..a2f36c7a064 100644 --- a/internal/pkg/template/templates/workloads/services/worker/manifest.yml +++ b/internal/pkg/template/templates/workloads/services/worker/manifest.yml @@ -45,7 +45,7 @@ subscribe: queue: {{ $topic.Queue.Enabled }} {{- end }} {{- end}} - {{- if and .Subscribe.Queue .Subscribe.Queue.FIFO .Subscribe.Queue.FIFO.Enable }} + {{- if .Subscribe.Queue.FIFO.Enable }} queue: fifo: {{ .Subscribe.Queue.FIFO.Enable }} {{- end }} From ee910dd1203c5686fedcf4e73f1e681144f0e42e Mon Sep 17 00:00:00 2001 From: Parag Bhingre Date: Mon, 10 Oct 2022 14:25:14 -0700 Subject: [PATCH 7/7] nits --- internal/pkg/manifest/worker_svc.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/internal/pkg/manifest/worker_svc.go b/internal/pkg/manifest/worker_svc.go index 8e292dd0801..0dc91613495 100644 --- a/internal/pkg/manifest/worker_svc.go +++ b/internal/pkg/manifest/worker_svc.go @@ -217,10 +217,15 @@ func NewWorkerService(props WorkerServiceProps) *WorkerService { return svc } +// setSubscriptionQueueDefaults function modifies the manifest to have +// 1. FIFO Topic names without ".fifo" suffix. +// 2. If there are both FIFO and Standard topic subscriptions are specified then set +// default events queue to FIFO and add standard topic-specific queue for all the standard topic subscriptions. +// 3. If there are only Standard topic subscriptions are specified then do nothing and return. func setSubscriptionQueueDefaults(topics []TopicSubscription, eventsQueue *SQSQueue) { var isFIFOEnabled bool for _, topic := range topics { - if strings.HasSuffix(aws.StringValue(topic.Name), ".fifo") { + if isFIFO(aws.StringValue(topic.Name)) { isFIFOEnabled = true break } @@ -230,7 +235,7 @@ func setSubscriptionQueueDefaults(topics []TopicSubscription, eventsQueue *SQSQu } eventsQueue.FIFO.Enable = aws.Bool(true) for idx, topic := range topics { - if strings.HasSuffix(aws.StringValue(topic.Name), ".fifo") { + if isFIFO(aws.StringValue(topic.Name)) { topics[idx].Name = aws.String(strings.TrimSuffix(aws.StringValue(topic.Name), ".fifo")) } else { topics[idx].Queue.Enabled = aws.Bool(true) @@ -238,6 +243,10 @@ func setSubscriptionQueueDefaults(topics []TopicSubscription, eventsQueue *SQSQu } } +func isFIFO(topic string) bool { + return strings.HasSuffix(topic, ".fifo") +} + // MarshalBinary serializes the manifest object into a binary YAML document. // Implements the encoding.BinaryMarshaler interface. func (s *WorkerService) MarshalBinary() ([]byte, error) {