Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/pkg/initialize/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type WorkloadProps struct {
Image string
Platform manifest.PlatformArgsOrString
Topics []manifest.TopicSubscription
Queue manifest.SQSQueue
}

// JobProps contains the information needed to represent a Job.
Expand Down Expand Up @@ -370,6 +371,7 @@ func newWorkerServiceManifest(i *ServiceProps) (*manifest.WorkerService, error)
HealthCheck: i.HealthCheck,
Platform: i.Platform,
Topics: i.Topics,
Queue: i.Queue,
}), nil
}

Expand Down
48 changes: 48 additions & 0 deletions internal/pkg/initialize/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,54 @@ func TestWorkloadInitializer_Service(t *testing.T) {
}, "worker")
},
},
"topic subscriptions enabled with default fifo queue": {
inSvcType: manifest.WorkerServiceType,
inAppName: "app",
inSvcName: "worker",
inDockerfilePath: "worker/Dockerfile",
inSvcPort: 80,
inTopics: []manifest.TopicSubscription{
{
Name: aws.String("theTopic.fifo"),
Service: aws.String("publisher"),
},
},

mockWriter: func(m *mocks.MockWorkspace) {
// workspace root: "/worker"
gomock.InOrder(
m.EXPECT().Rel("worker/Dockerfile").Return("Dockerfile", nil),
m.EXPECT().Rel("/worker/manifest.yml").Return("manifest.yml", nil))
m.EXPECT().WriteServiceManifest(gomock.Any(), "worker").
Do(func(m *manifest.WorkerService, _ string) {
require.Equal(t, *m.Workload.Type, manifest.WorkerServiceType)
require.Equal(t, *m.Subscribe.Queue.FIFO.Enable, true)
require.Empty(t, m.ImageConfig.HealthCheck)
}).Return("/worker/manifest.yml", nil)
},
mockstore: func(m *mocks.MockStore) {
m.EXPECT().CreateService(gomock.Any()).
Do(func(app *config.Workload) {
require.Equal(t, &config.Workload{
Name: "worker",
App: "app",
Type: manifest.WorkerServiceType,
}, app)
}).
Return(nil)

m.EXPECT().GetApplication("app").Return(&config.Application{
Name: "app",
AccountID: "1234",
}, nil)
},
mockappDeployer: func(m *mocks.MockWorkloadAdder) {
m.EXPECT().AddServiceToApp(&config.Application{
Name: "app",
AccountID: "1234",
}, "worker")
},
},
}

for name, tc := range testCases {
Expand Down
22 changes: 22 additions & 0 deletions internal/pkg/manifest/initial_manifest_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,28 @@ func TestWorkerSvc_InitialManifestIntegration(t *testing.T) {
},
wantedTestdata: "worker-svc-subscribe.yml",
},
"with fifo topic subscription with default fifo queue": {
inProps: WorkerServiceProps{
WorkloadProps: WorkloadProps{
Name: "testers",
Dockerfile: "./testers/Dockerfile",
},
Platform: PlatformArgsOrString{
PlatformString: nil,
PlatformArgs: PlatformArgs{
OSFamily: nil,
Arch: nil,
},
},
Topics: []TopicSubscription{
{
Name: aws.String("testTopic.fifo"),
Service: aws.String("service4TestTopic"),
},
},
},
wantedTestdata: "worker-svc-with-default-fifo-queue.yml",
},
}

for name, tc := range testCases {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# The manifest for the "testers" service.
# Read the full specification for the "Worker Service" type at:
# https://aws.github.io/copilot-cli/docs/manifest/worker-service/

# Your service name will be used in naming your resources like log groups, ECS services, etc.
name: testers
type: Worker Service

# Configuration for your containers and service.
image:
# Docker build arguments.
build: ./testers/Dockerfile

cpu: 256 # Number of CPU units for the task.
memory: 512 # Amount of memory in MiB used by the task.
count: 1 # Number of tasks that should be running in your service.
exec: true # Enable running commands in your container.

# The events can be be received from an SQS queue via the env var $COPILOT_QUEUE_URI.
subscribe:
topics:
- name: testTopic
service: service4TestTopic
queue:
fifo: true

# Optional fields for more advanced use-cases.
#
#variables: # Pass environment variables as key value pairs.
# LOG_LEVEL: info

#secrets: # Pass secrets from AWS Systems Manager (SSM) Parameter Store.
# GITHUB_TOKEN: GITHUB_TOKEN # The key is the name of the environment variable, the value is the name of the SSM parameter.

# You can override any of the values defined above by environment.
#environments:
# test:
# count: 2 # Number of tasks to run for the "test" environment.
# deployment: # The deployment strategy for the "test" environment.
# rolling: 'recreate' # Stops existing tasks before new ones are started for faster deployments.
35 changes: 35 additions & 0 deletions internal/pkg/manifest/worker_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package manifest

import (
"errors"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -190,6 +191,7 @@ type WorkerServiceProps struct {
HealthCheck ContainerHealthCheck // Optional healthcheck configuration.
Platform PlatformArgsOrString // Optional platform configuration.
Topics []TopicSubscription // Optional topics for subscriptions
Queue SQSQueue // Optional queue configuration.
}

// NewWorkerService applies the props to a default Worker service configuration with
Expand All @@ -206,12 +208,45 @@ func NewWorkerService(props WorkerServiceProps) *WorkerService {
svc.WorkerServiceConfig.TaskConfig.CPU = aws.Int(MinWindowsTaskCPU)
svc.WorkerServiceConfig.TaskConfig.Memory = aws.Int(MinWindowsTaskMemory)
}
if len(props.Topics) > 0 {
setSubscriptionQueueDefaults(props.Topics, &svc.WorkerServiceConfig.Subscribe.Queue)
}
svc.WorkerServiceConfig.Subscribe.Topics = props.Topics
svc.WorkerServiceConfig.Platform = props.Platform
svc.parser = template.New()
return svc
}

// setSubscriptionQueueDefaults function modifies the manifest to have
// 1. FIFO Topic names without ".fifo" suffix.
// 2. If there are both FIFO and Standard topic subscriptions are specified then set
// default events queue to FIFO and add standard topic-specific queue for all the standard topic subscriptions.
// 3. If there are only Standard topic subscriptions are specified then do nothing and return.
func setSubscriptionQueueDefaults(topics []TopicSubscription, eventsQueue *SQSQueue) {
var isFIFOEnabled bool
for _, topic := range topics {
if isFIFO(aws.StringValue(topic.Name)) {
isFIFOEnabled = true
break
}
}
if !isFIFOEnabled {
return
}
eventsQueue.FIFO.Enable = aws.Bool(true)
for idx, topic := range topics {
if isFIFO(aws.StringValue(topic.Name)) {
topics[idx].Name = aws.String(strings.TrimSuffix(aws.StringValue(topic.Name), ".fifo"))
} else {
topics[idx].Queue.Enabled = aws.Bool(true)
}
}
}

func isFIFO(topic string) bool {
return strings.HasSuffix(topic, ".fifo")
}

// MarshalBinary serializes the manifest object into a binary YAML document.
// Implements the encoding.BinaryMarshaler interface.
func (s *WorkerService) MarshalBinary() ([]byte, error) {
Expand Down
Loading