From 0c68acbe2364b76465da6c6d23baf093c1174509 Mon Sep 17 00:00:00 2001 From: ewezy Date: Wed, 26 Mar 2025 12:59:33 +0800 Subject: [PATCH] Add pubsub message ordering flag --- common/testutils/pubsub.go | 2 +- .../services/messagequeue/pubsub_message_queue_service.go | 5 +++-- .../testhelper/mockmanagement/server/server_test.go | 5 +++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/common/testutils/pubsub.go b/common/testutils/pubsub.go index 3f37ae6a..edf5313f 100644 --- a/common/testutils/pubsub.go +++ b/common/testutils/pubsub.go @@ -82,7 +82,7 @@ func CreateSubscriptions(client *pubsub.Client, ctx context.Context, topicNames topic := client.Topic(topicName) subscription, err := client.CreateSubscription(ctx, subscriptionId, - pubsub.SubscriptionConfig{Topic: topic, ExpirationPolicy: time.Hour * 24}) + pubsub.SubscriptionConfig{Topic: topic, ExpirationPolicy: time.Hour * 24, EnableMessageOrdering: true}) if err != nil { return nil, err } diff --git a/treatment-service/services/messagequeue/pubsub_message_queue_service.go b/treatment-service/services/messagequeue/pubsub_message_queue_service.go index 527759f9..9f620793 100644 --- a/treatment-service/services/messagequeue/pubsub_message_queue_service.go +++ b/treatment-service/services/messagequeue/pubsub_message_queue_service.go @@ -35,8 +35,9 @@ func newSubscriptionId(topic string) string { func newPubsubSubscription(ctx context.Context, client *pubsub.Client, topic string) (*pubsub.Subscription, error) { return client.CreateSubscription( ctx, newSubscriptionId(topic), pubsub.SubscriptionConfig{ - Topic: client.Topic(topic), - ExpirationPolicy: time.Hour * 24, + Topic: client.Topic(topic), + ExpirationPolicy: time.Hour * 24, + EnableMessageOrdering: true, }, ) } diff --git a/treatment-service/testhelper/mockmanagement/server/server_test.go b/treatment-service/testhelper/mockmanagement/server/server_test.go index 37aa90db..460ddc85 100644 --- a/treatment-service/testhelper/mockmanagement/server/server_test.go +++ b/treatment-service/testhelper/mockmanagement/server/server_test.go @@ -182,8 +182,9 @@ func (suite *ManagementServiceTestSuite) SetupTest() { topic := suite.pubsubClient.Topic(TOPIC) subscriptionId := "sub-" + uuid.NewString() subscription, err := suite.pubsubClient.CreateSubscription(suite.ctx, subscriptionId, pubsub.SubscriptionConfig{ - Topic: topic, - ExpirationPolicy: time.Hour * 24, + Topic: topic, + ExpirationPolicy: time.Hour * 24, + EnableMessageOrdering: true, }) if err != nil { suite.FailNow("fail to instantiate client", err.Error())