diff --git a/internal/pkg/initialize/workload.go b/internal/pkg/initialize/workload.go index 84bc50442c0..2ef4c6d822c 100644 --- a/internal/pkg/initialize/workload.go +++ b/internal/pkg/initialize/workload.go @@ -67,6 +67,7 @@ type WorkloadProps struct { Image string Platform manifest.PlatformArgsOrString Topics []manifest.TopicSubscription + Queue manifest.SQSQueue } // JobProps contains the information needed to represent a Job. @@ -370,6 +371,7 @@ func newWorkerServiceManifest(i *ServiceProps) (*manifest.WorkerService, error) HealthCheck: i.HealthCheck, Platform: i.Platform, Topics: i.Topics, + Queue: i.Queue, }), nil } diff --git a/internal/pkg/initialize/workload_test.go b/internal/pkg/initialize/workload_test.go index 307eff4c6cf..1b4ffb00ed1 100644 --- a/internal/pkg/initialize/workload_test.go +++ b/internal/pkg/initialize/workload_test.go @@ -735,6 +735,54 @@ func TestWorkloadInitializer_Service(t *testing.T) { }, "worker") }, }, + "topic subscriptions enabled with default fifo queue": { + inSvcType: manifest.WorkerServiceType, + inAppName: "app", + inSvcName: "worker", + inDockerfilePath: "worker/Dockerfile", + inSvcPort: 80, + inTopics: []manifest.TopicSubscription{ + { + Name: aws.String("theTopic.fifo"), + Service: aws.String("publisher"), + }, + }, + + mockWriter: func(m *mocks.MockWorkspace) { + // workspace root: "/worker" + gomock.InOrder( + m.EXPECT().Rel("worker/Dockerfile").Return("Dockerfile", nil), + m.EXPECT().Rel("/worker/manifest.yml").Return("manifest.yml", nil)) + m.EXPECT().WriteServiceManifest(gomock.Any(), "worker"). + Do(func(m *manifest.WorkerService, _ string) { + require.Equal(t, *m.Workload.Type, manifest.WorkerServiceType) + require.Equal(t, *m.Subscribe.Queue.FIFO.Enable, true) + require.Empty(t, m.ImageConfig.HealthCheck) + }).Return("/worker/manifest.yml", nil) + }, + mockstore: func(m *mocks.MockStore) { + m.EXPECT().CreateService(gomock.Any()). + Do(func(app *config.Workload) { + require.Equal(t, &config.Workload{ + Name: "worker", + App: "app", + Type: manifest.WorkerServiceType, + }, app) + }). + Return(nil) + + m.EXPECT().GetApplication("app").Return(&config.Application{ + Name: "app", + AccountID: "1234", + }, nil) + }, + mockappDeployer: func(m *mocks.MockWorkloadAdder) { + m.EXPECT().AddServiceToApp(&config.Application{ + Name: "app", + AccountID: "1234", + }, "worker") + }, + }, } for name, tc := range testCases { diff --git a/internal/pkg/manifest/initial_manifest_integration_test.go b/internal/pkg/manifest/initial_manifest_integration_test.go index 8a0e299fd2a..62163c058cc 100644 --- a/internal/pkg/manifest/initial_manifest_integration_test.go +++ b/internal/pkg/manifest/initial_manifest_integration_test.go @@ -161,6 +161,28 @@ func TestWorkerSvc_InitialManifestIntegration(t *testing.T) { }, wantedTestdata: "worker-svc-subscribe.yml", }, + "with fifo topic subscription with default fifo queue": { + inProps: WorkerServiceProps{ + WorkloadProps: WorkloadProps{ + Name: "testers", + Dockerfile: "./testers/Dockerfile", + }, + Platform: PlatformArgsOrString{ + PlatformString: nil, + PlatformArgs: PlatformArgs{ + OSFamily: nil, + Arch: nil, + }, + }, + Topics: []TopicSubscription{ + { + Name: aws.String("testTopic.fifo"), + Service: aws.String("service4TestTopic"), + }, + }, + }, + wantedTestdata: "worker-svc-with-default-fifo-queue.yml", + }, } for name, tc := range testCases { diff --git a/internal/pkg/manifest/testdata/worker-svc-with-default-fifo-queue.yml b/internal/pkg/manifest/testdata/worker-svc-with-default-fifo-queue.yml new file mode 100644 index 00000000000..00e1da5e041 --- /dev/null +++ b/internal/pkg/manifest/testdata/worker-svc-with-default-fifo-queue.yml @@ -0,0 +1,40 @@ +# The manifest for the "testers" service. +# Read the full specification for the "Worker Service" type at: +# https://aws.github.io/copilot-cli/docs/manifest/worker-service/ + +# Your service name will be used in naming your resources like log groups, ECS services, etc. +name: testers +type: Worker Service + +# Configuration for your containers and service. +image: + # Docker build arguments. + build: ./testers/Dockerfile + +cpu: 256 # Number of CPU units for the task. +memory: 512 # Amount of memory in MiB used by the task. +count: 1 # Number of tasks that should be running in your service. +exec: true # Enable running commands in your container. + +# The events can be be received from an SQS queue via the env var $COPILOT_QUEUE_URI. +subscribe: + topics: + - name: testTopic + service: service4TestTopic + queue: + fifo: true + +# Optional fields for more advanced use-cases. +# +#variables: # Pass environment variables as key value pairs. +# LOG_LEVEL: info + +#secrets: # Pass secrets from AWS Systems Manager (SSM) Parameter Store. +# GITHUB_TOKEN: GITHUB_TOKEN # The key is the name of the environment variable, the value is the name of the SSM parameter. + +# You can override any of the values defined above by environment. +#environments: +# test: +# count: 2 # Number of tasks to run for the "test" environment. +# deployment: # The deployment strategy for the "test" environment. +# rolling: 'recreate' # Stops existing tasks before new ones are started for faster deployments. diff --git a/internal/pkg/manifest/worker_svc.go b/internal/pkg/manifest/worker_svc.go index 1d4f172ec37..0dc91613495 100644 --- a/internal/pkg/manifest/worker_svc.go +++ b/internal/pkg/manifest/worker_svc.go @@ -5,6 +5,7 @@ package manifest import ( "errors" + "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -190,6 +191,7 @@ type WorkerServiceProps struct { HealthCheck ContainerHealthCheck // Optional healthcheck configuration. Platform PlatformArgsOrString // Optional platform configuration. Topics []TopicSubscription // Optional topics for subscriptions + Queue SQSQueue // Optional queue configuration. } // NewWorkerService applies the props to a default Worker service configuration with @@ -206,12 +208,45 @@ func NewWorkerService(props WorkerServiceProps) *WorkerService { svc.WorkerServiceConfig.TaskConfig.CPU = aws.Int(MinWindowsTaskCPU) svc.WorkerServiceConfig.TaskConfig.Memory = aws.Int(MinWindowsTaskMemory) } + if len(props.Topics) > 0 { + setSubscriptionQueueDefaults(props.Topics, &svc.WorkerServiceConfig.Subscribe.Queue) + } svc.WorkerServiceConfig.Subscribe.Topics = props.Topics svc.WorkerServiceConfig.Platform = props.Platform svc.parser = template.New() return svc } +// setSubscriptionQueueDefaults function modifies the manifest to have +// 1. FIFO Topic names without ".fifo" suffix. +// 2. If there are both FIFO and Standard topic subscriptions are specified then set +// default events queue to FIFO and add standard topic-specific queue for all the standard topic subscriptions. +// 3. If there are only Standard topic subscriptions are specified then do nothing and return. +func setSubscriptionQueueDefaults(topics []TopicSubscription, eventsQueue *SQSQueue) { + var isFIFOEnabled bool + for _, topic := range topics { + if isFIFO(aws.StringValue(topic.Name)) { + isFIFOEnabled = true + break + } + } + if !isFIFOEnabled { + return + } + eventsQueue.FIFO.Enable = aws.Bool(true) + for idx, topic := range topics { + if isFIFO(aws.StringValue(topic.Name)) { + topics[idx].Name = aws.String(strings.TrimSuffix(aws.StringValue(topic.Name), ".fifo")) + } else { + topics[idx].Queue.Enabled = aws.Bool(true) + } + } +} + +func isFIFO(topic string) bool { + return strings.HasSuffix(topic, ".fifo") +} + // MarshalBinary serializes the manifest object into a binary YAML document. // Implements the encoding.BinaryMarshaler interface. func (s *WorkerService) MarshalBinary() ([]byte, error) { diff --git a/internal/pkg/manifest/worker_svc_test.go b/internal/pkg/manifest/worker_svc_test.go index 9edbc38779c..110dd4cf198 100644 --- a/internal/pkg/manifest/worker_svc_test.go +++ b/internal/pkg/manifest/worker_svc_test.go @@ -142,6 +142,223 @@ func TestNewWorkerSvc(t *testing.T) { }, }, }, + "should return a worker service instance with 2 subscriptions to the default fifo queue and 2 standard topic specific queues": { + inProps: WorkerServiceProps{ + WorkloadProps: WorkloadProps{ + Name: "testers", + Dockerfile: "./testers/Dockerfile", + }, + Topics: []TopicSubscription{ + { + Name: aws.String("fifoTopic1.fifo"), + Service: aws.String("fifoService1"), + }, + { + Name: aws.String("fifoTopic2.fifo"), + Service: aws.String("fifoService2"), + }, + { + Name: aws.String("standardTopic1"), + Service: aws.String("standardService1"), + }, + { + Name: aws.String("standardTopic2"), + Service: aws.String("standardService2"), + }, + }, + }, + wantedManifest: &WorkerService{ + Workload: Workload{ + Name: aws.String("testers"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + ImageConfig: ImageWithHealthcheck{ + Image: Image{ + Build: BuildArgsOrString{ + BuildArgs: DockerBuildArgs{ + Dockerfile: aws.String("./testers/Dockerfile"), + }, + }, + }, + }, + Subscribe: SubscribeConfig{ + Topics: []TopicSubscription{ + { + Name: aws.String("fifoTopic1"), + Service: aws.String("fifoService1"), + }, + { + Name: aws.String("fifoTopic2"), + Service: aws.String("fifoService2"), + }, + { + Name: aws.String("standardTopic1"), + Service: aws.String("standardService1"), + Queue: SQSQueueOrBool{Enabled: aws.Bool(true)}, + }, + { + Name: aws.String("standardTopic2"), + Service: aws.String("standardService2"), + Queue: SQSQueueOrBool{Enabled: aws.Bool(true)}, + }, + }, + Queue: SQSQueue{ + FIFO: FIFOAdvanceConfigOrBool{Enable: aws.Bool(true)}, + }, + }, + TaskConfig: TaskConfig{ + CPU: aws.Int(256), + Memory: aws.Int(512), + Count: Count{ + Value: aws.Int(1), + }, + ExecuteCommand: ExecuteCommand{ + Enable: aws.Bool(false), + }, + }, + Network: NetworkConfig{ + VPC: vpcConfig{ + Placement: PlacementArgOrString{ + PlacementString: placementStringP(PublicSubnetPlacement), + }, + }, + }, + }, + }, + }, + "should return a worker service instance with 2 subscriptions to the default fifo queue": { + inProps: WorkerServiceProps{ + WorkloadProps: WorkloadProps{ + Name: "testers", + Dockerfile: "./testers/Dockerfile", + }, + Topics: []TopicSubscription{ + { + Name: aws.String("fifoTopic1.fifo"), + Service: aws.String("fifoService1"), + }, + { + Name: aws.String("fifoTopic2.fifo"), + Service: aws.String("fifoService2"), + }, + }, + }, + wantedManifest: &WorkerService{ + Workload: Workload{ + Name: aws.String("testers"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + ImageConfig: ImageWithHealthcheck{ + Image: Image{ + Build: BuildArgsOrString{ + BuildArgs: DockerBuildArgs{ + Dockerfile: aws.String("./testers/Dockerfile"), + }, + }, + }, + }, + Subscribe: SubscribeConfig{ + Topics: []TopicSubscription{ + { + Name: aws.String("fifoTopic1"), + Service: aws.String("fifoService1"), + }, + { + Name: aws.String("fifoTopic2"), + Service: aws.String("fifoService2"), + }, + }, + Queue: SQSQueue{ + FIFO: FIFOAdvanceConfigOrBool{Enable: aws.Bool(true)}, + }, + }, + TaskConfig: TaskConfig{ + CPU: aws.Int(256), + Memory: aws.Int(512), + Count: Count{ + Value: aws.Int(1), + }, + ExecuteCommand: ExecuteCommand{ + Enable: aws.Bool(false), + }, + }, + Network: NetworkConfig{ + VPC: vpcConfig{ + Placement: PlacementArgOrString{ + PlacementString: placementStringP(PublicSubnetPlacement), + }, + }, + }, + }, + }, + }, + "should return a worker service instance with 2 subscriptions to the default standard queue": { + inProps: WorkerServiceProps{ + WorkloadProps: WorkloadProps{ + Name: "testers", + Dockerfile: "./testers/Dockerfile", + }, + Topics: []TopicSubscription{ + { + Name: aws.String("standardTopic1"), + Service: aws.String("standardService1"), + }, + { + Name: aws.String("standardTopic2"), + Service: aws.String("standardService2"), + }, + }, + }, + wantedManifest: &WorkerService{ + Workload: Workload{ + Name: aws.String("testers"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + ImageConfig: ImageWithHealthcheck{ + Image: Image{ + Build: BuildArgsOrString{ + BuildArgs: DockerBuildArgs{ + Dockerfile: aws.String("./testers/Dockerfile"), + }, + }, + }, + }, + Subscribe: SubscribeConfig{ + Topics: []TopicSubscription{ + { + Name: aws.String("standardTopic1"), + Service: aws.String("standardService1"), + }, + { + Name: aws.String("standardTopic2"), + Service: aws.String("standardService2"), + }, + }, + Queue: SQSQueue{}, + }, + TaskConfig: TaskConfig{ + CPU: aws.Int(256), + Memory: aws.Int(512), + Count: Count{ + Value: aws.Int(1), + }, + ExecuteCommand: ExecuteCommand{ + Enable: aws.Bool(false), + }, + }, + Network: NetworkConfig{ + VPC: vpcConfig{ + Placement: PlacementArgOrString{ + PlacementString: placementStringP(PublicSubnetPlacement), + }, + }, + }, + }, + }, + }, } for name, tc := range testCases { diff --git a/internal/pkg/template/templates/workloads/services/worker/manifest.yml b/internal/pkg/template/templates/workloads/services/worker/manifest.yml index 7663c1d16aa..a2f36c7a064 100644 --- a/internal/pkg/template/templates/workloads/services/worker/manifest.yml +++ b/internal/pkg/template/templates/workloads/services/worker/manifest.yml @@ -41,7 +41,14 @@ subscribe: {{- range $topic := .Subscribe.Topics}} - name: {{$topic.Name}} service: {{$topic.Service}} + {{- if $topic.Queue.Enabled }} + queue: {{ $topic.Queue.Enabled }} + {{- end }} {{- end}} + {{- if .Subscribe.Queue.FIFO.Enable }} + queue: + fifo: {{ .Subscribe.Queue.FIFO.Enable }} + {{- end }} {{- else}} # You can register to topics from other services. # The events can be be received from an SQS queue via the env var $COPILOT_QUEUE_URI.