From 7ee85eda203ed0b5b423290fc698a65e10bec61b Mon Sep 17 00:00:00 2001 From: eweziyi Date: Thu, 16 Jan 2025 15:40:41 +0800 Subject: [PATCH 1/9] Add field tags to poller configs --- treatment-service/config/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/treatment-service/config/config.go b/treatment-service/config/config.go index a738b02..e8ca67f 100644 --- a/treatment-service/config/config.go +++ b/treatment-service/config/config.go @@ -96,8 +96,8 @@ type ManagementServiceConfig struct { } type ManagementServicePollerConfig struct { - Enabled bool `default:"false"` - PollInterval time.Duration `default:"30s"` + Enabled bool `json:"enabled" default:"false"` + PollInterval time.Duration `json:"poll_interval" default:"30s"` } func (c *Config) GetProjectIds() []models.ProjectId { From c920104c2b39baf9573e51b57ba3d163e9d8a623 Mon Sep 17 00:00:00 2001 From: eweziyi Date: Thu, 16 Jan 2025 16:05:06 +0800 Subject: [PATCH 2/9] Add poller config to plugin --- plugins/turing/config/config.go | 17 ++++++++------- plugins/turing/manager/experiment_manager.go | 23 ++++++++++---------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/plugins/turing/config/config.go b/plugins/turing/config/config.go index c8ccffc..4bc0df4 100644 --- a/plugins/turing/config/config.go +++ b/plugins/turing/config/config.go @@ -46,14 +46,15 @@ type TreatmentServicePluginConfig struct { Port int `json:"port" default:"8080"` PubSubTimeoutSeconds int `json:"pub_sub_timeout_seconds" validate:"required"` - AssignedTreatmentLogger config.AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` - DebugConfig config.DebugConfig `json:"debug_config"` - DeploymentConfig config.DeploymentConfig `json:"deployment_config"` - ManagementService config.ManagementServiceConfig `json:"management_service"` - MonitoringConfig config.Monitoring `json:"monitoring_config"` - SwaggerConfig config.SwaggerConfig `json:"swagger_config"` - NewRelicConfig newrelic.Config `json:"new_relic_config"` - SentryConfig sentry.Config `json:"sentry_config"` + AssignedTreatmentLogger config.AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` + DebugConfig config.DebugConfig `json:"debug_config"` + DeploymentConfig config.DeploymentConfig `json:"deployment_config"` + ManagementService config.ManagementServiceConfig `json:"management_service"` + MonitoringConfig config.Monitoring `json:"monitoring_config"` + SwaggerConfig config.SwaggerConfig `json:"swagger_config"` + NewRelicConfig newrelic.Config `json:"new_relic_config"` + SentryConfig sentry.Config `json:"sentry_config"` + ManagementServicePollerConfig config.ManagementServicePollerConfig `json:"management_service_poller_config"` } type Variable struct { diff --git a/plugins/turing/manager/experiment_manager.go b/plugins/turing/manager/experiment_manager.go index b10ad2b..26b8681 100644 --- a/plugins/turing/manager/experiment_manager.go +++ b/plugins/turing/manager/experiment_manager.go @@ -142,17 +142,18 @@ func (em *experimentManager) MakeTreatmentServicePluginConfig( projectID int, ) (*config.Config, error) { pluginConfig := &config.Config{ - Port: em.TreatmentServicePluginConfig.Port, - ProjectIds: []string{strconv.Itoa(projectID)}, - AssignedTreatmentLogger: em.TreatmentServicePluginConfig.AssignedTreatmentLogger, - DebugConfig: em.TreatmentServicePluginConfig.DebugConfig, - DeploymentConfig: em.TreatmentServicePluginConfig.DeploymentConfig, - ManagementService: em.TreatmentServicePluginConfig.ManagementService, - MonitoringConfig: em.TreatmentServicePluginConfig.MonitoringConfig, - SwaggerConfig: em.TreatmentServicePluginConfig.SwaggerConfig, - NewRelicConfig: em.TreatmentServicePluginConfig.NewRelicConfig, - SentryConfig: em.TreatmentServicePluginConfig.SentryConfig, - SegmenterConfig: *treatmentServiceConfig.SegmenterConfig, + Port: em.TreatmentServicePluginConfig.Port, + ProjectIds: []string{strconv.Itoa(projectID)}, + AssignedTreatmentLogger: em.TreatmentServicePluginConfig.AssignedTreatmentLogger, + DebugConfig: em.TreatmentServicePluginConfig.DebugConfig, + DeploymentConfig: em.TreatmentServicePluginConfig.DeploymentConfig, + ManagementService: em.TreatmentServicePluginConfig.ManagementService, + MonitoringConfig: em.TreatmentServicePluginConfig.MonitoringConfig, + SwaggerConfig: em.TreatmentServicePluginConfig.SwaggerConfig, + NewRelicConfig: em.TreatmentServicePluginConfig.NewRelicConfig, + SentryConfig: em.TreatmentServicePluginConfig.SentryConfig, + SegmenterConfig: *treatmentServiceConfig.SegmenterConfig, + ManagementServicePollerConfig: em.TreatmentServicePluginConfig.ManagementServicePollerConfig, } messageQueueKind := *treatmentServiceConfig.MessageQueueConfig.Kind switch messageQueueKind { From c58eb82f90ab3e5b81dedb57156b72fe481070dc Mon Sep 17 00:00:00 2001 From: eweziyi Date: Fri, 17 Jan 2025 12:38:20 +0800 Subject: [PATCH 3/9] Update message queue config with json tags --- common/messagequeue/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/messagequeue/config.go b/common/messagequeue/config.go index fd5d22f..27183a6 100644 --- a/common/messagequeue/config.go +++ b/common/messagequeue/config.go @@ -12,10 +12,10 @@ const ( type MessageQueueConfig struct { // The type of Message Queue for event updates - Kind MessageQueueKind `default:""` + Kind MessageQueueKind `json:"kind" default:""` // PubSubConfig captures the config related to publishing and subscribing to a PubSub Message Queue - PubSubConfig *PubSubConfig + PubSubConfig *PubSubConfig `json:"pub_sub_config"` } type PubSubConfig struct { From ee6a143a6c0a65839118bcf015ed093fae50a52a Mon Sep 17 00:00:00 2001 From: eweziyi Date: Fri, 17 Jan 2025 13:22:50 +0800 Subject: [PATCH 4/9] Update xp plugin to use message queue config from manager instead of xp management --- plugins/turing/config/config.go | 2 ++ plugins/turing/manager/experiment_manager.go | 21 +------------------ .../turing/manager/experiment_manager_test.go | 8 +++++++ 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/plugins/turing/config/config.go b/plugins/turing/config/config.go index 4bc0df4..d9917e5 100644 --- a/plugins/turing/config/config.go +++ b/plugins/turing/config/config.go @@ -2,6 +2,7 @@ package config import ( "fmt" + common_mq_config "github.com/caraml-dev/xp/common/messagequeue" "sync" "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" @@ -49,6 +50,7 @@ type TreatmentServicePluginConfig struct { AssignedTreatmentLogger config.AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` DebugConfig config.DebugConfig `json:"debug_config"` DeploymentConfig config.DeploymentConfig `json:"deployment_config"` + MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config"` ManagementService config.ManagementServiceConfig `json:"management_service"` MonitoringConfig config.Monitoring `json:"monitoring_config"` SwaggerConfig config.SwaggerConfig `json:"swagger_config"` diff --git a/plugins/turing/manager/experiment_manager.go b/plugins/turing/manager/experiment_manager.go index 26b8681..9c93c2f 100644 --- a/plugins/turing/manager/experiment_manager.go +++ b/plugins/turing/manager/experiment_manager.go @@ -14,7 +14,6 @@ import ( xpclient "github.com/caraml-dev/xp/clients/management" "github.com/caraml-dev/xp/common/api/schema" - common_mq_config "github.com/caraml-dev/xp/common/messagequeue" _config "github.com/caraml-dev/xp/plugins/turing/config" "github.com/caraml-dev/xp/treatment-service/config" "github.com/go-playground/validator/v10" @@ -147,6 +146,7 @@ func (em *experimentManager) MakeTreatmentServicePluginConfig( AssignedTreatmentLogger: em.TreatmentServicePluginConfig.AssignedTreatmentLogger, DebugConfig: em.TreatmentServicePluginConfig.DebugConfig, DeploymentConfig: em.TreatmentServicePluginConfig.DeploymentConfig, + MessageQueueConfig: em.TreatmentServicePluginConfig.MessageQueueConfig, ManagementService: em.TreatmentServicePluginConfig.ManagementService, MonitoringConfig: em.TreatmentServicePluginConfig.MonitoringConfig, SwaggerConfig: em.TreatmentServicePluginConfig.SwaggerConfig, @@ -155,25 +155,6 @@ func (em *experimentManager) MakeTreatmentServicePluginConfig( SegmenterConfig: *treatmentServiceConfig.SegmenterConfig, ManagementServicePollerConfig: em.TreatmentServicePluginConfig.ManagementServicePollerConfig, } - messageQueueKind := *treatmentServiceConfig.MessageQueueConfig.Kind - switch messageQueueKind { - case schema.MessageQueueKindPubsub: - pluginConfig.MessageQueueConfig = common_mq_config.MessageQueueConfig{ - Kind: "pubsub", - PubSubConfig: &common_mq_config.PubSubConfig{ - Project: *treatmentServiceConfig.MessageQueueConfig.PubSub.Project, - TopicName: *treatmentServiceConfig.MessageQueueConfig.PubSub.TopicName, - PubSubTimeoutSeconds: em.TreatmentServicePluginConfig.PubSubTimeoutSeconds, - }, - } - case schema.MessageQueueKindNoop: - pluginConfig.MessageQueueConfig = common_mq_config.MessageQueueConfig{ - Kind: "", - } - default: - return nil, fmt.Errorf("invalid message queue kind (%s) was provided", messageQueueKind) - } - return pluginConfig, nil } diff --git a/plugins/turing/manager/experiment_manager_test.go b/plugins/turing/manager/experiment_manager_test.go index 045a012..a257c91 100644 --- a/plugins/turing/manager/experiment_manager_test.go +++ b/plugins/turing/manager/experiment_manager_test.go @@ -62,6 +62,14 @@ func TestNewExperimentManager(t *testing.T) { "environment_type": "dev", "max_go_routines": 200 }, + "message_queue_config": { + "kind": "dev", + "pub_sub_config": { + "project":"dev", + "topic_name":"xp-update", + "pub_sub_timeout_seconds": 30 + } + }, "management_service": { "authorization_enabled": true, "url": "http://xp-management.global.io/api/xp/v1" From 20009ce390a37e89474a32b70d1cf93d5c214213 Mon Sep 17 00:00:00 2001 From: eweziyi Date: Fri, 17 Jan 2025 13:28:06 +0800 Subject: [PATCH 5/9] Reorder imports --- plugins/turing/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/turing/config/config.go b/plugins/turing/config/config.go index d9917e5..d55b82d 100644 --- a/plugins/turing/config/config.go +++ b/plugins/turing/config/config.go @@ -2,11 +2,11 @@ package config import ( "fmt" - common_mq_config "github.com/caraml-dev/xp/common/messagequeue" "sync" "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" + common_mq_config "github.com/caraml-dev/xp/common/messagequeue" "github.com/caraml-dev/xp/treatment-service/config" "github.com/go-playground/validator/v10" ) From 3c43ebc76fa02d00516d13c7715019a5e92a8a71 Mon Sep 17 00:00:00 2001 From: eweziyi Date: Fri, 17 Jan 2025 16:21:49 +0800 Subject: [PATCH 6/9] Change poller interval field to int --- treatment-service/config/config.go | 8 +++----- treatment-service/config/config_test.go | 12 +++++------- treatment-service/go.mod | 4 ---- treatment-service/go.sum | 6 ------ treatment-service/server/poller.go | 5 +++-- 5 files changed, 11 insertions(+), 24 deletions(-) diff --git a/treatment-service/config/config.go b/treatment-service/config/config.go index e8ca67f..afc8be9 100644 --- a/treatment-service/config/config.go +++ b/treatment-service/config/config.go @@ -2,14 +2,12 @@ package config import ( "fmt" - "strconv" - "time" - "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" common_config "github.com/caraml-dev/xp/common/config" common_mq_config "github.com/caraml-dev/xp/common/messagequeue" "github.com/caraml-dev/xp/treatment-service/models" + "strconv" ) type AssignedTreatmentLoggerKind = string @@ -96,8 +94,8 @@ type ManagementServiceConfig struct { } type ManagementServicePollerConfig struct { - Enabled bool `json:"enabled" default:"false"` - PollInterval time.Duration `json:"poll_interval" default:"30s"` + Enabled bool `json:"enabled" default:"false"` + PollIntervalSeconds int `json:"poll_interval" default:"30"` } func (c *Config) GetProjectIds() []models.ProjectId { diff --git a/treatment-service/config/config_test.go b/treatment-service/config/config_test.go index 9c04fd6..a67a39e 100644 --- a/treatment-service/config/config_test.go +++ b/treatment-service/config/config_test.go @@ -1,13 +1,11 @@ package config import ( - "testing" - "time" - "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "testing" common_mq_config "github.com/caraml-dev/xp/common/messagequeue" ) @@ -66,8 +64,8 @@ func TestDefaultConfigs(t *testing.T) { SentryConfig: sentry.Config{Enabled: false, Labels: emptyStringMap}, SegmenterConfig: make(map[string]interface{}), ManagementServicePollerConfig: ManagementServicePollerConfig{ - Enabled: false, - PollInterval: 30 * time.Second, + Enabled: false, + PollIntervalSeconds: 30, }, } cfg, err := Load() @@ -133,8 +131,8 @@ func TestLoadMultipleConfigs(t *testing.T) { SentryConfig: sentry.Config{Enabled: true, DSN: "my.amazing.sentry.dsn", Labels: map[string]string{"app": "xp-treatment-service"}}, SegmenterConfig: map[string]interface{}{"s2_ids": map[string]interface{}{"mins2celllevel": 9, "maxs2celllevel": 15}}, ManagementServicePollerConfig: ManagementServicePollerConfig{ - Enabled: false, - PollInterval: 30 * time.Second, + Enabled: false, + PollIntervalSeconds: 30, }, } diff --git a/treatment-service/go.mod b/treatment-service/go.mod index c4c1d47..fda4fab 100644 --- a/treatment-service/go.mod +++ b/treatment-service/go.mod @@ -14,7 +14,6 @@ require ( github.com/deepmap/oapi-codegen v1.11.0 github.com/getkin/kin-openapi v0.94.0 github.com/go-chi/chi/v5 v5.0.7 - github.com/go-playground/validator/v10 v10.11.1 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 github.com/golang/geo v0.0.0-20210211234256-740aa86cb551 github.com/google/go-cmp v0.6.0 @@ -104,8 +103,6 @@ require ( github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.3 // indirect - github.com/go-playground/locales v0.14.0 // indirect - github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-viper/mapstructure/v2 v2.0.0 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/gofrs/flock v0.8.1 // indirect @@ -139,7 +136,6 @@ require ( github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect - github.com/leodido/go-urn v1.2.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect diff --git a/treatment-service/go.sum b/treatment-service/go.sum index 788fc76..f4f0b24 100644 --- a/treatment-service/go.sum +++ b/treatment-service/go.sum @@ -333,18 +333,13 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= -github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= -github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= -github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= -github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ= -github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= github.com/go-sql-driver/mysql v1.3.0 h1:pgwjLi/dvffoP9aabwkT3AKpXQM93QARkjFhDDqC1UE= github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -604,7 +599,6 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.7.2/go.mod h1:xkCDAdFCIf8jsFQ5NnbK7oqaF/yU1A1X20Ltm0OvSks= github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= -github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lestrrat-go/backoff/v2 v2.0.8/go.mod h1:rHP/q/r9aT27n24JQLa7JhSQZCKBBOiM/uP402WwN8Y= github.com/lestrrat-go/blackmagic v1.0.0/go.mod h1:TNgH//0vYSs8VXDCfkZLgIrVTTXQELZffUV0tz3MtdQ= diff --git a/treatment-service/server/poller.go b/treatment-service/server/poller.go index da636d4..ebcb837 100644 --- a/treatment-service/server/poller.go +++ b/treatment-service/server/poller.go @@ -26,13 +26,14 @@ func NewPoller(pollerConfig config.ManagementServicePollerConfig, localStorage * } func (p *Poller) Start() { - ticker := time.NewTicker(p.pollerConfig.PollInterval) + pollInterval := time.Duration(p.pollerConfig.PollIntervalSeconds) * time.Second + ticker := time.NewTicker(pollInterval) go func() { for { select { case <-ticker.C: err := p.Refresh() - log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval) + log.Printf("Polling at %v with interval %v", time.Now(), pollInterval) if err != nil { log.Printf("Error updating local storage: %v", err) continue From 8fd757e322b47e43e87f4cfa8ab62989883a0062 Mon Sep 17 00:00:00 2001 From: eweziyi Date: Fri, 17 Jan 2025 16:28:47 +0800 Subject: [PATCH 7/9] Reorder imports --- treatment-service/config/config.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/treatment-service/config/config.go b/treatment-service/config/config.go index afc8be9..d2c1d5f 100644 --- a/treatment-service/config/config.go +++ b/treatment-service/config/config.go @@ -2,12 +2,13 @@ package config import ( "fmt" + "strconv" + "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" common_config "github.com/caraml-dev/xp/common/config" common_mq_config "github.com/caraml-dev/xp/common/messagequeue" "github.com/caraml-dev/xp/treatment-service/models" - "strconv" ) type AssignedTreatmentLoggerKind = string From 1cf57ff6e955bdee7c0066dc6cc4c5818e1feb12 Mon Sep 17 00:00:00 2001 From: eweziyi Date: Fri, 17 Jan 2025 16:31:11 +0800 Subject: [PATCH 8/9] Reorder imports --- treatment-service/config/config_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/treatment-service/config/config_test.go b/treatment-service/config/config_test.go index a67a39e..843faa1 100644 --- a/treatment-service/config/config_test.go +++ b/treatment-service/config/config_test.go @@ -1,11 +1,12 @@ package config import ( + "testing" + "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" common_mq_config "github.com/caraml-dev/xp/common/messagequeue" ) From a58a714f32567e256f86efa82f53f9245328ca2b Mon Sep 17 00:00:00 2001 From: eweziyi Date: Fri, 17 Jan 2025 17:27:15 +0800 Subject: [PATCH 9/9] Refactor poller into a service --- plugins/turing/runner/experiment_runner.go | 3 +++ treatment-service/appcontext/appcontext.go | 7 +++++++ treatment-service/server/server.go | 17 ++++------------- .../poller.go => services/poller_service.go} | 17 +++++++++-------- 4 files changed, 23 insertions(+), 21 deletions(-) rename treatment-service/{server/poller.go => services/poller_service.go} (67%) diff --git a/plugins/turing/runner/experiment_runner.go b/plugins/turing/runner/experiment_runner.go index 197b737..12a39ba 100644 --- a/plugins/turing/runner/experiment_runner.go +++ b/plugins/turing/runner/experiment_runner.go @@ -257,6 +257,9 @@ func (er *experimentRunner) startBackgroundServices( } }() } + if er.appContext.PollerService != nil { + er.appContext.PollerService.Start() + } } func (er *experimentRunner) getRequestParams( diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index 8e96126..e74c1fb 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -24,6 +24,7 @@ type AppContext struct { AssignedTreatmentLogger *monitoring.AssignedTreatmentLogger LocalStorage *models.LocalStorage + PollerService *services.PollerService } func NewAppContext(cfg *config.Config) (*AppContext, error) { @@ -122,6 +123,11 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { return nil, err } + var pollerService *services.PollerService + if cfg.ManagementServicePollerConfig.Enabled { + pollerService = services.NewPollerService(cfg.ManagementServicePollerConfig, localStorage) + } + appContext := &AppContext{ ExperimentService: experimentSvc, MetricService: metricService, @@ -131,6 +137,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { AssignedTreatmentLogger: logger, MessageQueueService: messageQueueService, LocalStorage: localStorage, + PollerService: pollerService, } return appContext, nil diff --git a/treatment-service/server/server.go b/treatment-service/server/server.go index e94f17d..8704b71 100644 --- a/treatment-service/server/server.go +++ b/treatment-service/server/server.go @@ -33,8 +33,6 @@ type Server struct { subscribe bool // cleanup captures all the actions to be executed on server shut down cleanup []func() - // poller captures the poller instance - poller *Poller } // NewServer creates and configures an APIServer serving all application routes. @@ -108,11 +106,6 @@ func NewServer(configFiles []string) (*Server, error) { subscribe = true } - var poller *Poller - if cfg.ManagementServicePollerConfig.Enabled { - poller = NewPoller(cfg.ManagementServicePollerConfig, appCtx.LocalStorage) - } - srv := http.Server{ Addr: cfg.ListenAddress(), Handler: mux, @@ -123,7 +116,6 @@ func NewServer(configFiles []string) (*Server, error) { appContext: appCtx, subscribe: subscribe, cleanup: cleanup, - poller: poller, }, nil } @@ -141,11 +133,6 @@ func (srv *Server) Start() { }() log.Printf("Listening on %s\n", srv.Addr) - if srv.poller != nil { - log.Println("Starting poller...") - srv.poller.Start() - } - stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt) @@ -193,5 +180,9 @@ func (srv *Server) startBackgroundService(errChannel chan error) context.CancelF } }() + if srv.appContext.PollerService != nil { + srv.appContext.PollerService.Start() + } + return cancel } diff --git a/treatment-service/server/poller.go b/treatment-service/services/poller_service.go similarity index 67% rename from treatment-service/server/poller.go rename to treatment-service/services/poller_service.go index ebcb837..8d6928a 100644 --- a/treatment-service/server/poller.go +++ b/treatment-service/services/poller_service.go @@ -1,4 +1,4 @@ -package server +package services import ( "log" @@ -8,24 +8,25 @@ import ( "github.com/caraml-dev/xp/treatment-service/models" ) -type Poller struct { +type PollerService struct { pollerConfig config.ManagementServicePollerConfig localStorage *models.LocalStorage stopChannel chan struct{} } -// NewPoller creates a new Poller instance with the given configuration and local storage. +// NewPollerService creates a new PollerService instance with the given configuration and local storage. // pollerConfig: configuration for the poller // localStorage: local storage to be used by the poller -func NewPoller(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *Poller { - return &Poller{ +func NewPollerService(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *PollerService { + return &PollerService{ pollerConfig: pollerConfig, localStorage: localStorage, stopChannel: make(chan struct{}), } } -func (p *Poller) Start() { +func (p *PollerService) Start() { + log.Println("Starting management service poller service...") pollInterval := time.Duration(p.pollerConfig.PollIntervalSeconds) * time.Second ticker := time.NewTicker(pollInterval) go func() { @@ -46,11 +47,11 @@ func (p *Poller) Start() { }() } -func (p *Poller) Stop() { +func (p *PollerService) Stop() { close(p.stopChannel) } -func (p *Poller) Refresh() error { +func (p *PollerService) Refresh() error { err := p.localStorage.Init() return err }