From d79dafee0106d2dbad971d5a68a2f7adebe25dd9 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 28 Feb 2023 10:46:40 +0000 Subject: [PATCH 01/24] wip Signed-off-by: James Milligan --- cmd/start.go | 17 ++++- pkg/runtime/from_config.go | 96 ++++++++++++++++++-------- pkg/runtime/runtime.go | 8 +-- pkg/sync/file/filepath_sync.go | 6 +- pkg/sync/grpc/grpc_sync.go | 1 + pkg/sync/http/http_sync.go | 19 +++-- pkg/sync/http/http_sync_test.go | 9 +-- pkg/sync/isync.go | 9 ++- pkg/sync/kubernetes/kubernetes_sync.go | 10 +-- 9 files changed, 113 insertions(+), 62 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index ee6f1b597..0fb864955 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -7,6 +7,7 @@ import ( "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/runtime" + "github.com/open-feature/flagd/pkg/sync" "github.com/spf13/cobra" "github.com/spf13/viper" "go.uber.org/zap" @@ -105,17 +106,27 @@ var startCmd = &cobra.Command{ rtLogger.Warn("DEPRECATED: The --evaluator flag has been deprecated. " + "Docs: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md") } + + syncProvidersFromURI, err := runtime.SyncProvidersFromArgs(viper.GetStringSlice(uriFlagName)) + if err != nil { + log.Fatal(err) + } + + syncProviders := []sync.SyncProviderConfig{} + if err := viper.UnmarshalKey(syncProviderFlagName, &syncProviders); err != nil { + log.Fatal(err) + } + syncProviders = append(syncProviders, syncProvidersFromURI...) + // Build Runtime ----------------------------------------------------------- rt, err := runtime.FromConfig(logger, runtime.Config{ CORS: viper.GetStringSlice(corsFlagName), MetricsPort: viper.GetInt32(metricsPortFlagName), - ProviderArgs: viper.GetStringMapString(providerArgsFlagName), ServiceCertPath: viper.GetString(serverCertPathFlagName), ServiceKeyPath: viper.GetString(serverKeyPathFlagName), ServicePort: viper.GetInt32(portFlagName), ServiceSocketPath: viper.GetString(socketPathFlagName), - SyncBearerToken: viper.GetString(bearerTokenFlagName), - SyncURI: viper.GetStringSlice(uriFlagName), + SyncProviders: syncProviders, }) if err != nil { rtLogger.Fatal(err.Error()) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 7e313de9c..b530e3210 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -19,6 +19,13 @@ import ( "go.uber.org/zap" ) +const ( + syncProviderFile = "file" + syncProviderGrpc = "grpc" + syncProviderKubernetes = "kubernetes" + syncProviderHttp = "http" +) + var ( regCrd *regexp.Regexp regURL *regexp.Regexp @@ -64,43 +71,43 @@ func (r *Runtime) setService(logger *logger.Logger) { func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { rtLogger := logger.WithFields(zap.String("component", "runtime")) - r.SyncImpl = make([]sync.ISync, 0, len(r.config.SyncURI)) - for _, uri := range r.config.SyncURI { - switch uriB := []byte(uri); { - case regFile.Match(uriB): + r.SyncImpl = make([]sync.ISync, 0, len(r.config.SyncProviders)) + for _, syncProvider := range r.config.SyncProviders { + switch syncProvider.Provider { + case syncProviderFile: r.SyncImpl = append( r.SyncImpl, - r.newFile(uri, logger), + r.newFile(syncProvider, logger), ) - rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri)) - case regCrd.Match(uriB): + rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", syncProvider.URI)) + case syncProviderKubernetes: r.SyncImpl = append( r.SyncImpl, - r.newK8s(uri, logger), + r.newK8s(syncProvider, logger), ) - rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri)) - case regURL.Match(uriB): + rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", syncProvider.URI)) + case syncProviderHttp: r.SyncImpl = append( r.SyncImpl, - r.newHTTP(uri, logger), + r.newHTTP(syncProvider, logger), ) - rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri)) - case regGRPC.Match(uriB): + rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", syncProvider.URI)) + case syncProviderGrpc: r.SyncImpl = append( r.SyncImpl, - r.newGRPC(uri, logger), + r.newGRPC(syncProvider, logger), ) default: return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+ - " or 'core.openfeature.dev'", uri) + " or 'core.openfeature.dev'", syncProvider.URI) } } return nil } -func (r *Runtime) newGRPC(uri string, logger *logger.Logger) *grpc.Sync { +func (r *Runtime) newGRPC(config sync.SyncProviderConfig, logger *logger.Logger) *grpc.Sync { return &grpc.Sync{ - Target: grpc.URLToGRPCTarget(uri), + Target: grpc.URLToGRPCTarget(config.URI), Logger: logger.WithFields( zap.String("component", "sync"), zap.String("sync", "grpc"), @@ -109,10 +116,9 @@ func (r *Runtime) newGRPC(uri string, logger *logger.Logger) *grpc.Sync { } } -func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync { +func (r *Runtime) newHTTP(config sync.SyncProviderConfig, logger *logger.Logger) *httpSync.Sync { return &httpSync.Sync{ - URI: uri, - BearerToken: r.config.SyncBearerToken, + URI: config.URI, Client: &http.Client{ Timeout: time.Second * 10, }, @@ -120,30 +126,62 @@ func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync { zap.String("component", "sync"), zap.String("sync", "remote"), ), - ProviderArgs: r.config.ProviderArgs, - Cron: cron.New(), + Config: config, + Cron: cron.New(), } } -func (r *Runtime) newK8s(uri string, logger *logger.Logger) *kubernetes.Sync { +func (r *Runtime) newK8s(config sync.SyncProviderConfig, logger *logger.Logger) *kubernetes.Sync { return &kubernetes.Sync{ Logger: logger.WithFields( zap.String("component", "sync"), zap.String("sync", "kubernetes"), ), - URI: regCrd.ReplaceAllString(uri, ""), - ProviderArgs: r.config.ProviderArgs, + URI: config.URI, + Config: config, } } -func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync { +func (r *Runtime) newFile(config sync.SyncProviderConfig, logger *logger.Logger) *file.Sync { return &file.Sync{ - URI: regFile.ReplaceAllString(uri, ""), + URI: config.URI, Logger: logger.WithFields( zap.String("component", "sync"), zap.String("sync", "filepath"), ), - ProviderArgs: r.config.ProviderArgs, - Mux: &msync.RWMutex{}, + Config: config, + Mux: &msync.RWMutex{}, + } +} + +func SyncProvidersFromArgs(uris []string) ([]sync.SyncProviderConfig, error) { + syncProvidersParsed := []sync.SyncProviderConfig{} + for _, uri := range uris { + switch uriB := []byte(uri); { + case regFile.Match(uriB): + syncProvidersParsed = append(syncProvidersParsed, sync.SyncProviderConfig{ + URI: regFile.ReplaceAllString(uri, ""), + Provider: syncProviderFile, + }) + case regCrd.Match(uriB): + syncProvidersParsed = append(syncProvidersParsed, sync.SyncProviderConfig{ + URI: regCrd.ReplaceAllString(uri, ""), + Provider: syncProviderKubernetes, + }) + case regURL.Match(uriB): + syncProvidersParsed = append(syncProvidersParsed, sync.SyncProviderConfig{ + URI: uri, + Provider: syncProviderHttp, + }) + case regGRPC.Match(uriB): + syncProvidersParsed = append(syncProvidersParsed, sync.SyncProviderConfig{ + URI: uri, + Provider: syncProviderGrpc, + }) + default: + return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+ + " or 'core.openfeature.dev'", uri) + } } + return syncProvidersParsed, nil } diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 4772eaef3..41b150bfa 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -33,12 +33,8 @@ type Config struct { ServiceCertPath string ServiceKeyPath string - ProviderArgs sync.ProviderArgs - SyncURI []string - RemoteSyncType string - SyncBearerToken string - - CORS []string + SyncProviders []sync.SyncProviderConfig + CORS []string } func (r *Runtime) Start() error { diff --git a/pkg/sync/file/filepath_sync.go b/pkg/sync/file/filepath_sync.go index 5fc31ebfd..9763a2d1b 100644 --- a/pkg/sync/file/filepath_sync.go +++ b/pkg/sync/file/filepath_sync.go @@ -17,9 +17,9 @@ import ( ) type Sync struct { - URI string - Logger *logger.Logger - ProviderArgs sync.ProviderArgs + URI string + Logger *logger.Logger + Config sync.SyncProviderConfig // FileType indicates the file type e.g., json, yaml/yml etc., fileType string watcher *fsnotify.Watcher diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index a36c7ac35..c274a8c6a 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -39,6 +39,7 @@ type Sync struct { options []grpc.DialOption ready bool Mux *msync.RWMutex + Config sync.SyncProviderConfig } func (g *Sync) Init(ctx context.Context) error { diff --git a/pkg/sync/http/http_sync.go b/pkg/sync/http/http_sync.go index c2a56460f..8f2a786dd 100644 --- a/pkg/sync/http/http_sync.go +++ b/pkg/sync/http/http_sync.go @@ -16,14 +16,13 @@ import ( ) type Sync struct { - URI string - Client Client - Cron Cron - BearerToken string - LastBodySHA string - Logger *logger.Logger - ProviderArgs sync.ProviderArgs - ready bool + URI string + Client Client + Cron Cron + LastBodySHA string + Logger *logger.Logger + ready bool + Config sync.SyncProviderConfig } // Client defines the behaviour required of a http client @@ -108,8 +107,8 @@ func (hs *Sync) fetchBodyFromURL(ctx context.Context, url string) ([]byte, error req.Header.Add("Accept", "application/json") - if hs.BearerToken != "" { - bearer := "Bearer " + hs.BearerToken + if hs.Config.BearerToken != "" { + bearer := fmt.Sprintf("Bearer %s", hs.Config.BearerToken) req.Header.Set("Authorization", bearer) } diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go index e7e1596c4..952f04fe9 100644 --- a/pkg/sync/http/http_sync_test.go +++ b/pkg/sync/http/http_sync_test.go @@ -32,7 +32,6 @@ func TestSimpleSync(t *testing.T) { URI: "http://localhost", Client: mockClient, Cron: mockCron, - BearerToken: "", LastBodySHA: "", Logger: logger.NewLogger(nil, false), } @@ -146,9 +145,11 @@ func TestHTTPSync_Fetch(t *testing.T) { tt.setup(t, mockClient) httpSync := Sync{ - URI: tt.uri, - Client: mockClient, - BearerToken: tt.bearerToken, + URI: tt.uri, + Client: mockClient, + Config: sync.SyncProviderConfig{ + BearerToken: tt.bearerToken, + }, LastBodySHA: tt.lastBodySHA, Logger: logger.NewLogger(nil, false), } diff --git a/pkg/sync/isync.go b/pkg/sync/isync.go index 7af60ff9d..c370c4f72 100644 --- a/pkg/sync/isync.go +++ b/pkg/sync/isync.go @@ -2,8 +2,6 @@ package sync import "context" -type ProviderArgs map[string]string - type Type int // Type of the sync operation @@ -55,3 +53,10 @@ type DataSync struct { Source string Type } + +type SyncProviderConfig struct { + URI string `json:"uri"` + Provider string `json:"provider"` + + BearerToken string `json:"bearerToken,omitempty"` +} diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index 632599d67..729fe95f7 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -26,11 +26,11 @@ import ( var resyncPeriod = 1 * time.Minute type Sync struct { - Logger *logger.Logger - ProviderArgs sync.ProviderArgs - client client.Client - URI string - ready bool + Logger *logger.Logger + Config sync.SyncProviderConfig + client client.Client + URI string + ready bool } func (k *Sync) Init(ctx context.Context) error { From ee8d8232c629ade7237c17a7c9e1b9ff416e8ffe Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 28 Feb 2023 13:55:06 +0000 Subject: [PATCH 02/24] docs Signed-off-by: James Milligan --- pkg/runtime/from_config.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index b530e3210..dc9f2d317 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -1,6 +1,8 @@ package runtime import ( + "encoding/json" + "errors" "fmt" "net/http" "regexp" @@ -154,7 +156,23 @@ func (r *Runtime) newFile(config sync.SyncProviderConfig, logger *logger.Logger) } } -func SyncProvidersFromArgs(uris []string) ([]sync.SyncProviderConfig, error) { +func SyncProviderArgPass(syncProviders string) ([]sync.SyncProviderConfig, error) { + syncProvidersParsed := []sync.SyncProviderConfig{} + if err := json.Unmarshal([]byte(syncProviders), &syncProvidersParsed); err != nil { + return syncProvidersParsed, fmt.Errorf("unable to parse sync providers: %w", err) + } + for _, sp := range syncProvidersParsed { + if sp.URI == "" { + return syncProvidersParsed, errors.New("sync provider argument parse: uri is a required field") + } + if sp.Provider == "" { + return syncProvidersParsed, errors.New("sync provider argument parse: provider is a required field") + } + } + return syncProvidersParsed, nil +} + +func SyncProvidersFromURIs(uris []string) ([]sync.SyncProviderConfig, error) { syncProvidersParsed := []sync.SyncProviderConfig{} for _, uri := range uris { switch uriB := []byte(uri); { From 35a84b53804a903eba4ea49196237f47b5cd50a0 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 28 Feb 2023 13:55:11 +0000 Subject: [PATCH 03/24] docs Signed-off-by: James Milligan --- cmd/start.go | 30 ++++++++++++++++++----------- docs/configuration/configuration.md | 28 +++++++++++++++++++++++++-- 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 0fb864955..aa921a630 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -25,6 +25,7 @@ const ( serverCertPathFlagName = "server-cert-path" serverKeyPathFlagName = "server-key-path" socketPathFlagName = "socket-path" + syncProvidersFlagName = "sync-providers" syncProviderFlagName = "sync-provider" uriFlagName = "uri" ) @@ -58,6 +59,10 @@ func init() { flags.StringP( syncProviderFlagName, "y", "", "DEPRECATED: Set a sync provider e.g. filepath or remote", ) + flags.StringP( + syncProvidersFlagName, "y", "", "JSON representation of an array of SyncProviderConfig objects. This object contains "+ + "2 required fields, uri (string) and provider (string). Documentation for this object can be found here: ", + ) flags.StringP(logFormatFlagName, "z", "console", "Set the logging format, e.g. console or json ") _ = viper.BindPFlag(bearerTokenFlagName, flags.Lookup(bearerTokenFlagName)) @@ -71,6 +76,7 @@ func init() { _ = viper.BindPFlag(serverKeyPathFlagName, flags.Lookup(serverKeyPathFlagName)) _ = viper.BindPFlag(socketPathFlagName, flags.Lookup(socketPathFlagName)) _ = viper.BindPFlag(syncProviderFlagName, flags.Lookup(syncProviderFlagName)) + _ = viper.BindPFlag(syncProvidersFlagName, flags.Lookup(syncProviderFlagName)) _ = viper.BindPFlag(uriFlagName, flags.Lookup(uriFlagName)) } @@ -97,26 +103,28 @@ var startCmd = &cobra.Command{ rtLogger.Info(fmt.Sprintf("flagd version: %s (%s), built at: %s", Version, Commit, Date)) - if viper.GetString(syncProviderFlagName) != "" { - rtLogger.Warn("DEPRECATED: The --sync-provider flag has been deprecated. " + - "Docs: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md") - } - if viper.GetString(evaluatorFlagName) != "json" { rtLogger.Warn("DEPRECATED: The --evaluator flag has been deprecated. " + "Docs: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md") } - syncProvidersFromURI, err := runtime.SyncProvidersFromArgs(viper.GetStringSlice(uriFlagName)) + syncProviders, err := runtime.SyncProvidersFromURIs(viper.GetStringSlice(uriFlagName)) if err != nil { log.Fatal(err) } - - syncProviders := []sync.SyncProviderConfig{} - if err := viper.UnmarshalKey(syncProviderFlagName, &syncProviders); err != nil { - log.Fatal(err) + syncProviders2 := []sync.SyncProviderConfig{} + if cfgFile == "" { + syncProviders2, err = runtime.SyncProviderArgPass(viper.GetString(syncProvidersFlagName)) + if err != nil { + log.Fatal(err) + } + } else { + err = viper.UnmarshalKey(syncProvidersFlagName, &syncProviders2) + if err != nil { + log.Fatal(err) + } } - syncProviders = append(syncProviders, syncProvidersFromURI...) + syncProviders = append(syncProviders, syncProviders2...) // Build Runtime ----------------------------------------------------------- rt, err := runtime.FromConfig(logger, runtime.Config{ diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 2a209bdef..6be2ecf27 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -10,7 +10,7 @@ Environment variable keys are uppercased, prefixed with `FLAGD_` and all `-` are Config file expects the keys to have the exact naming as the flags. -### URI patterns +### URI patterns Any URI passed to flagd via the `--uri` flag must follow one of the 4 following patterns to ensure that it is passed to the correct implementation: @@ -22,7 +22,6 @@ Any URI passed to flagd via the `--uri` flag must follow one of the 4 following | Grpc | `grpc://flag-source-url` | `grpc://my-flags-server` | - ### Customising sync providers Custom sync providers can be used to provide flag evaluation logic. @@ -35,4 +34,29 @@ To use an existing FeatureFlagConfiguration custom resource, start flagD with th ```shell flagd start --uri core.openfeature.dev/default/my_example +``` + +### Sync Provider Configuration + +While a URI may be passed to flagd via the `--uri` flag, some implementations may require further configurations. In these cases the `--sync-providers` flag should be used. +The flag takes a string argument, which should be a JSON representation of an array of SyncProviderConfig objects. Alternatively, these configurations should be passed to +flagd via config file, specified using the `--config` flag. + +| Field | Type | +|------------|------------------------------------| +| uri | required `string` | | +| provider | required `string` (`file`, `kubernetes`, `http` or `grpc`) | +| bearerToken | optional `string` | + +The `uri` field values do not need to follow the [URI patterns](#uri-patterns), the provider type is instead derived from the provider field. + +Example start command using a filepath sync provider and the equivalent config file definition: +```sh +./flagd start --sync-providers=\[{\"uri\":\"config/samples/example_flags.json\"\,\"provider\":\"file\"}\] +``` + +```yaml +sync-providers: +- uri: config/samples/example_flags.json + provider: file ``` \ No newline at end of file From e88e47b00b13d5e9c2375f6fa768c2428df77044 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 28 Feb 2023 15:53:11 +0000 Subject: [PATCH 04/24] linting Signed-off-by: James Milligan --- cmd/start.go | 4 +-- docs/configuration/configuration.md | 2 +- pkg/runtime/from_config.go | 34 +++++++++++++------------- pkg/runtime/runtime.go | 2 +- pkg/sync/file/filepath_sync.go | 2 +- pkg/sync/grpc/grpc_sync.go | 2 +- pkg/sync/http/http_sync.go | 2 +- pkg/sync/http/http_sync_test.go | 2 +- pkg/sync/isync.go | 2 +- pkg/sync/kubernetes/kubernetes_sync.go | 2 +- 10 files changed, 27 insertions(+), 27 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index aa921a630..ebdd854b0 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -60,7 +60,7 @@ func init() { syncProviderFlagName, "y", "", "DEPRECATED: Set a sync provider e.g. filepath or remote", ) flags.StringP( - syncProvidersFlagName, "y", "", "JSON representation of an array of SyncProviderConfig objects. This object contains "+ + syncProvidersFlagName, "y", "", "JSON representation of an array of ProviderConfig objects. This object contains "+ "2 required fields, uri (string) and provider (string). Documentation for this object can be found here: ", ) flags.StringP(logFormatFlagName, "z", "console", "Set the logging format, e.g. console or json ") @@ -112,7 +112,7 @@ var startCmd = &cobra.Command{ if err != nil { log.Fatal(err) } - syncProviders2 := []sync.SyncProviderConfig{} + syncProviders2 := []sync.ProviderConfig{} if cfgFile == "" { syncProviders2, err = runtime.SyncProviderArgPass(viper.GetString(syncProvidersFlagName)) if err != nil { diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 6be2ecf27..087f81819 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -39,7 +39,7 @@ flagd start --uri core.openfeature.dev/default/my_example ### Sync Provider Configuration While a URI may be passed to flagd via the `--uri` flag, some implementations may require further configurations. In these cases the `--sync-providers` flag should be used. -The flag takes a string argument, which should be a JSON representation of an array of SyncProviderConfig objects. Alternatively, these configurations should be passed to +The flag takes a string argument, which should be a JSON representation of an array of `ProviderConfig` objects. Alternatively, these configurations should be passed to flagd via config file, specified using the `--config` flag. | Field | Type | diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index dc9f2d317..8b93c3773 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -25,7 +25,7 @@ const ( syncProviderFile = "file" syncProviderGrpc = "grpc" syncProviderKubernetes = "kubernetes" - syncProviderHttp = "http" + syncProviderHTTP = "http" ) var ( @@ -88,7 +88,7 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { r.newK8s(syncProvider, logger), ) rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", syncProvider.URI)) - case syncProviderHttp: + case syncProviderHTTP: r.SyncImpl = append( r.SyncImpl, r.newHTTP(syncProvider, logger), @@ -107,7 +107,7 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { return nil } -func (r *Runtime) newGRPC(config sync.SyncProviderConfig, logger *logger.Logger) *grpc.Sync { +func (r *Runtime) newGRPC(config sync.ProviderConfig, logger *logger.Logger) *grpc.Sync { return &grpc.Sync{ Target: grpc.URLToGRPCTarget(config.URI), Logger: logger.WithFields( @@ -118,7 +118,7 @@ func (r *Runtime) newGRPC(config sync.SyncProviderConfig, logger *logger.Logger) } } -func (r *Runtime) newHTTP(config sync.SyncProviderConfig, logger *logger.Logger) *httpSync.Sync { +func (r *Runtime) newHTTP(config sync.ProviderConfig, logger *logger.Logger) *httpSync.Sync { return &httpSync.Sync{ URI: config.URI, Client: &http.Client{ @@ -133,7 +133,7 @@ func (r *Runtime) newHTTP(config sync.SyncProviderConfig, logger *logger.Logger) } } -func (r *Runtime) newK8s(config sync.SyncProviderConfig, logger *logger.Logger) *kubernetes.Sync { +func (r *Runtime) newK8s(config sync.ProviderConfig, logger *logger.Logger) *kubernetes.Sync { return &kubernetes.Sync{ Logger: logger.WithFields( zap.String("component", "sync"), @@ -144,7 +144,7 @@ func (r *Runtime) newK8s(config sync.SyncProviderConfig, logger *logger.Logger) } } -func (r *Runtime) newFile(config sync.SyncProviderConfig, logger *logger.Logger) *file.Sync { +func (r *Runtime) newFile(config sync.ProviderConfig, logger *logger.Logger) *file.Sync { return &file.Sync{ URI: config.URI, Logger: logger.WithFields( @@ -156,8 +156,8 @@ func (r *Runtime) newFile(config sync.SyncProviderConfig, logger *logger.Logger) } } -func SyncProviderArgPass(syncProviders string) ([]sync.SyncProviderConfig, error) { - syncProvidersParsed := []sync.SyncProviderConfig{} +func SyncProviderArgPass(syncProviders string) ([]sync.ProviderConfig, error) { + syncProvidersParsed := []sync.ProviderConfig{} if err := json.Unmarshal([]byte(syncProviders), &syncProvidersParsed); err != nil { return syncProvidersParsed, fmt.Errorf("unable to parse sync providers: %w", err) } @@ -172,33 +172,33 @@ func SyncProviderArgPass(syncProviders string) ([]sync.SyncProviderConfig, error return syncProvidersParsed, nil } -func SyncProvidersFromURIs(uris []string) ([]sync.SyncProviderConfig, error) { - syncProvidersParsed := []sync.SyncProviderConfig{} +func SyncProvidersFromURIs(uris []string) ([]sync.ProviderConfig, error) { + syncProvidersParsed := []sync.ProviderConfig{} for _, uri := range uris { switch uriB := []byte(uri); { case regFile.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.SyncProviderConfig{ + syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ URI: regFile.ReplaceAllString(uri, ""), Provider: syncProviderFile, }) case regCrd.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.SyncProviderConfig{ + syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ URI: regCrd.ReplaceAllString(uri, ""), Provider: syncProviderKubernetes, }) case regURL.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.SyncProviderConfig{ + syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ URI: uri, - Provider: syncProviderHttp, + Provider: syncProviderHTTP, }) case regGRPC.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.SyncProviderConfig{ + syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ URI: uri, Provider: syncProviderGrpc, }) default: - return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+ - " or 'core.openfeature.dev'", uri) + return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+ + "'http(s)://', 'grpc://', or 'core.openfeature.dev'", uri) } } return syncProvidersParsed, nil diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 41b150bfa..6526745a9 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -33,7 +33,7 @@ type Config struct { ServiceCertPath string ServiceKeyPath string - SyncProviders []sync.SyncProviderConfig + SyncProviders []sync.ProviderConfig CORS []string } diff --git a/pkg/sync/file/filepath_sync.go b/pkg/sync/file/filepath_sync.go index 9763a2d1b..551a3dc3f 100644 --- a/pkg/sync/file/filepath_sync.go +++ b/pkg/sync/file/filepath_sync.go @@ -19,7 +19,7 @@ import ( type Sync struct { URI string Logger *logger.Logger - Config sync.SyncProviderConfig + Config sync.ProviderConfig // FileType indicates the file type e.g., json, yaml/yml etc., fileType string watcher *fsnotify.Watcher diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index c274a8c6a..1a81d72d0 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -39,7 +39,7 @@ type Sync struct { options []grpc.DialOption ready bool Mux *msync.RWMutex - Config sync.SyncProviderConfig + Config sync.ProviderConfig } func (g *Sync) Init(ctx context.Context) error { diff --git a/pkg/sync/http/http_sync.go b/pkg/sync/http/http_sync.go index 8f2a786dd..3bee580a8 100644 --- a/pkg/sync/http/http_sync.go +++ b/pkg/sync/http/http_sync.go @@ -22,7 +22,7 @@ type Sync struct { LastBodySHA string Logger *logger.Logger ready bool - Config sync.SyncProviderConfig + Config sync.ProviderConfig } // Client defines the behaviour required of a http client diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go index 952f04fe9..b43c7fe1c 100644 --- a/pkg/sync/http/http_sync_test.go +++ b/pkg/sync/http/http_sync_test.go @@ -147,7 +147,7 @@ func TestHTTPSync_Fetch(t *testing.T) { httpSync := Sync{ URI: tt.uri, Client: mockClient, - Config: sync.SyncProviderConfig{ + Config: sync.ProviderConfig{ BearerToken: tt.bearerToken, }, LastBodySHA: tt.lastBodySHA, diff --git a/pkg/sync/isync.go b/pkg/sync/isync.go index c370c4f72..6b011a6ce 100644 --- a/pkg/sync/isync.go +++ b/pkg/sync/isync.go @@ -54,7 +54,7 @@ type DataSync struct { Type } -type SyncProviderConfig struct { +type ProviderConfig struct { URI string `json:"uri"` Provider string `json:"provider"` diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index 729fe95f7..8118fa567 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -27,7 +27,7 @@ var resyncPeriod = 1 * time.Minute type Sync struct { Logger *logger.Logger - Config sync.SyncProviderConfig + Config sync.ProviderConfig client client.Client URI string ready bool From 484814fb8c3d9a7c3671199f4f969685984f5184 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 28 Feb 2023 15:55:30 +0000 Subject: [PATCH 05/24] docs Signed-off-by: James Milligan --- cmd/start.go | 2 +- docs/configuration/flagd_start.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/start.go b/cmd/start.go index ebdd854b0..4c53ff5be 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -60,7 +60,7 @@ func init() { syncProviderFlagName, "y", "", "DEPRECATED: Set a sync provider e.g. filepath or remote", ) flags.StringP( - syncProvidersFlagName, "y", "", "JSON representation of an array of ProviderConfig objects. This object contains "+ + syncProvidersFlagName, "s", "", "JSON representation of an array of ProviderConfig objects. This object contains "+ "2 required fields, uri (string) and provider (string). Documentation for this object can be found here: ", ) flags.StringP(logFormatFlagName, "z", "console", "Set the logging format, e.g. console or json ") diff --git a/docs/configuration/flagd_start.md b/docs/configuration/flagd_start.md index 0cc8945f3..bf54b4b0e 100644 --- a/docs/configuration/flagd_start.md +++ b/docs/configuration/flagd_start.md @@ -21,6 +21,7 @@ flagd start [flags] -d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally. -y, --sync-provider string DEPRECATED: Set a sync provider e.g. filepath or remote -a, --sync-provider-args stringToString Sync provider arguments as key values separated by = (default []) + -s, --sync-providers string JSON representation of an array of ProviderConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object can be found here: -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url (http and grpc) or FeatureFlagConfiguration. Using multiple providers is supported however if flag keys are duplicated across multiple sources it may lead to unexpected behavior. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. ``` From f8aef135a9590588714c805b02280afa7abc6bf9 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 28 Feb 2023 16:04:16 +0000 Subject: [PATCH 06/24] bug fix Signed-off-by: James Milligan --- cmd/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/start.go b/cmd/start.go index 4c53ff5be..c6ff4101a 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -113,7 +113,7 @@ var startCmd = &cobra.Command{ log.Fatal(err) } syncProviders2 := []sync.ProviderConfig{} - if cfgFile == "" { + if cfgFile == "" && viper.GetString(syncProvidersFlagName) != "" { syncProviders2, err = runtime.SyncProviderArgPass(viper.GetString(syncProvidersFlagName)) if err != nil { log.Fatal(err) From 71f09e70c43686a39daf7e760e4a798c71e084a7 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Wed, 1 Mar 2023 12:06:23 +0000 Subject: [PATCH 07/24] test coverage Signed-off-by: James Milligan --- pkg/runtime/runtime_test.go | 156 ++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 pkg/runtime/runtime_test.go diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go new file mode 100644 index 000000000..f1321a985 --- /dev/null +++ b/pkg/runtime/runtime_test.go @@ -0,0 +1,156 @@ +package runtime_test + +import ( + "reflect" + "testing" + + "github.com/open-feature/flagd/pkg/runtime" + "github.com/open-feature/flagd/pkg/sync" +) + +func TestSyncProviderArgPass(t *testing.T) { + test := map[string]struct { + in string + expectErr bool + out []sync.ProviderConfig + }{ + "simple": { + in: "[{\"uri\":\"config/samples/example_flags.json\",\"provider\":\"file\"}]", + expectErr: false, + out: []sync.ProviderConfig{ + { + URI: "config/samples/example_flags.json", + Provider: "file", + }, + }, + }, + "multiple-syncs": { + in: `[ + {"uri":"config/samples/example_flags.json","provider":"file"}, + {"uri":"http://test.com","provider":"http","bearerToken":":)"}, + {"uri":"host:port","provider":"grpc"}, + {"uri":"default/my-crd","provider":"kubernetes"} + ]`, + expectErr: false, + out: []sync.ProviderConfig{ + { + URI: "config/samples/example_flags.json", + Provider: "file", + }, + { + URI: "http://test.com", + Provider: "http", + BearerToken: ":)", + }, + { + URI: "host:port", + Provider: "grpc", + }, + { + URI: "default/my-crd", + Provider: "kubernetes", + }, + }, + }, + "empty": { + in: `[]`, + expectErr: false, + out: []sync.ProviderConfig{}, + }, + "parse-failure": { + in: ``, + expectErr: true, + out: []sync.ProviderConfig{}, + }, + } + + for name, tt := range test { + t.Run(name, func(t *testing.T) { + out, err := runtime.SyncProviderArgPass(tt.in) + if tt.expectErr { + if err == nil { + t.Error("expected error, got none") + } + } else if err != nil { + t.Errorf("did not expect error: %s", err.Error()) + } + if !reflect.DeepEqual(out, tt.out) { + t.Errorf("unexpected output, expected %v, got %v", tt.out, out) + } + }) + } +} + +func TestSyncProvidersFromURIs(t *testing.T) { + test := map[string]struct { + in []string + expectErr bool + out []sync.ProviderConfig + }{ + "simple": { + in: []string{ + "file:my-file.json", + }, + expectErr: false, + out: []sync.ProviderConfig{ + { + URI: "my-file.json", + Provider: "file", + }, + }, + }, + "multiple-uris": { + in: []string{ + "file:my-file.json", + "https://test.com", + "grpc://host:port", + "core.openfeature.dev/default/my-crd", + }, + expectErr: false, + out: []sync.ProviderConfig{ + { + URI: "my-file.json", + Provider: "file", + }, + { + URI: "https://test.com", + Provider: "http", + }, + { + URI: "grpc://host:port", + Provider: "grpc", + }, + { + URI: "default/my-crd", + Provider: "kubernetes", + }, + }, + }, + "empty": { + in: []string{}, + expectErr: false, + out: []sync.ProviderConfig{}, + }, + "parse-failure": { + in: []string{"care.openfeature.dev/will/fail"}, + expectErr: true, + out: []sync.ProviderConfig{}, + }, + } + + for name, tt := range test { + t.Run(name, func(t *testing.T) { + out, err := runtime.SyncProvidersFromURIs(tt.in) + if tt.expectErr { + if err == nil { + t.Error("expected error, got none") + } + } else if err != nil { + t.Errorf("did not expect error: %s", err.Error()) + } + if !reflect.DeepEqual(out, tt.out) { + t.Errorf("unexpected output, expected %v, got %v", tt.out, out) + } + }) + } +} From a0c844e8f5fb8d7c86e66dc4d13eaa791c80fec1 Mon Sep 17 00:00:00 2001 From: James Milligan <75740990+james-milligan@users.noreply.github.com> Date: Wed, 1 Mar 2023 13:51:21 +0000 Subject: [PATCH 08/24] Apply suggestions from code review Co-authored-by: Skye Gill Signed-off-by: James Milligan <75740990+james-milligan@users.noreply.github.com> --- pkg/runtime/from_config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 8b93c3773..200fb1780 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -81,7 +81,7 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { r.SyncImpl, r.newFile(syncProvider, logger), ) - rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", syncProvider.URI)) + rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %s", syncProvider.URI)) case syncProviderKubernetes: r.SyncImpl = append( r.SyncImpl, @@ -93,7 +93,7 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { r.SyncImpl, r.newHTTP(syncProvider, logger), ) - rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", syncProvider.URI)) + rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %s", syncProvider.URI)) case syncProviderGrpc: r.SyncImpl = append( r.SyncImpl, From c4dee0ed3251ea6f9d882ee480a1440771ae05a5 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Wed, 1 Mar 2023 13:54:07 +0000 Subject: [PATCH 09/24] name update Signed-off-by: James Milligan --- cmd/start.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index c6ff4101a..633b63e3c 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -112,19 +112,19 @@ var startCmd = &cobra.Command{ if err != nil { log.Fatal(err) } - syncProviders2 := []sync.ProviderConfig{} + syncProvidersFromConfig := []sync.ProviderConfig{} if cfgFile == "" && viper.GetString(syncProvidersFlagName) != "" { - syncProviders2, err = runtime.SyncProviderArgPass(viper.GetString(syncProvidersFlagName)) + syncProvidersFromConfig, err = runtime.SyncProviderArgPass(viper.GetString(syncProvidersFlagName)) if err != nil { log.Fatal(err) } } else { - err = viper.UnmarshalKey(syncProvidersFlagName, &syncProviders2) + err = viper.UnmarshalKey(syncProvidersFlagName, &syncProvidersFromConfig) if err != nil { log.Fatal(err) } } - syncProviders = append(syncProviders, syncProviders2...) + syncProviders = append(syncProviders, syncProvidersFromConfig...) // Build Runtime ----------------------------------------------------------- rt, err := runtime.FromConfig(logger, runtime.Config{ From 73ce052ce9739adca8a39cb50d033a8a82bbd748 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Thu, 2 Mar 2023 09:37:59 +0000 Subject: [PATCH 10/24] conflict fix Signed-off-by: James Milligan --- pkg/runtime/from_config.go | 6 +++++- pkg/sync/http/http_sync_test.go | 8 +++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index fa269181b..0d38cdfd7 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -45,7 +45,11 @@ func init() { func FromConfig(logger *logger.Logger, config Config) (*Runtime, error) { s := store.NewFlags() - s.FlagSources = config.SyncURI + sources := []string{} + for _, sync := range config.SyncProviders { + sources = append(sources, sync.URI) + } + s.FlagSources = sources rt := Runtime{ config: config, Logger: logger.WithFields(zap.String("component", "runtime")), diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go index 0daec0478..187fbdbf2 100644 --- a/pkg/sync/http/http_sync_test.go +++ b/pkg/sync/http/http_sync_test.go @@ -223,9 +223,11 @@ func TestHTTPSync_Resync(t *testing.T) { tt.setup(t, mockClient) httpSync := Sync{ - URI: tt.uri, - Client: mockClient, - BearerToken: tt.bearerToken, + URI: tt.uri, + Client: mockClient, + Config: sync.ProviderConfig{ + BearerToken: tt.bearerToken, + }, LastBodySHA: tt.lastBodySHA, Logger: logger.NewLogger(nil, false), } From 5c3942e340e4f8bf523651918892219dc412c840 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Thu, 2 Mar 2023 09:38:43 +0000 Subject: [PATCH 11/24] conflict fix Signed-off-by: James Milligan --- docs/configuration/flagd_start.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/flagd_start.md b/docs/configuration/flagd_start.md index 95770ff30..3944d3f2f 100644 --- a/docs/configuration/flagd_start.md +++ b/docs/configuration/flagd_start.md @@ -21,7 +21,7 @@ flagd start [flags] -d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally. -y, --sync-provider string DEPRECATED: Set a sync provider e.g. filepath or remote -a, --sync-provider-args stringToString Sync provider arguments as key values separated by = (default []) - -s, --sync-providers string JSON representation of an array of ProviderConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object can be found here: + -s, --sync-providers string JSON representation of an array of ProviderConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object can be found here: TODO -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url (http and grpc) or FeatureFlagConfiguration. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. ``` From de6b6ba24d2ed3139249df1f577398b4cdc11ae4 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Thu, 2 Mar 2023 09:55:52 +0000 Subject: [PATCH 12/24] bug fix Signed-off-by: James Milligan --- cmd/start.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/start.go b/cmd/start.go index d631eec97..19c282c01 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -77,7 +77,7 @@ func init() { _ = viper.BindPFlag(serverKeyPathFlagName, flags.Lookup(serverKeyPathFlagName)) _ = viper.BindPFlag(socketPathFlagName, flags.Lookup(socketPathFlagName)) _ = viper.BindPFlag(syncProviderFlagName, flags.Lookup(syncProviderFlagName)) - _ = viper.BindPFlag(syncProvidersFlagName, flags.Lookup(syncProviderFlagName)) + _ = viper.BindPFlag(syncProvidersFlagName, flags.Lookup(syncProvidersFlagName)) _ = viper.BindPFlag(uriFlagName, flags.Lookup(uriFlagName)) } @@ -113,6 +113,7 @@ var startCmd = &cobra.Command{ if err != nil { log.Fatal(err) } + syncProvidersFromConfig := []sync.ProviderConfig{} if cfgFile == "" && viper.GetString(syncProvidersFlagName) != "" { syncProvidersFromConfig, err = runtime.SyncProviderArgPass(viper.GetString(syncProvidersFlagName)) From 86d392d2ca843b5ae5247775a48dcf8438fe9b86 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Thu, 2 Mar 2023 10:09:09 +0000 Subject: [PATCH 13/24] doc fix Signed-off-by: James Milligan --- docs/configuration/flagd_start.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/flagd_start.md b/docs/configuration/flagd_start.md index 3944d3f2f..95770ff30 100644 --- a/docs/configuration/flagd_start.md +++ b/docs/configuration/flagd_start.md @@ -21,7 +21,7 @@ flagd start [flags] -d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally. -y, --sync-provider string DEPRECATED: Set a sync provider e.g. filepath or remote -a, --sync-provider-args stringToString Sync provider arguments as key values separated by = (default []) - -s, --sync-providers string JSON representation of an array of ProviderConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object can be found here: TODO + -s, --sync-providers string JSON representation of an array of ProviderConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object can be found here: -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url (http and grpc) or FeatureFlagConfiguration. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. ``` From 3e7c41a41e13722d72ff2990a70b48744c89766f Mon Sep 17 00:00:00 2001 From: James Milligan Date: Thu, 2 Mar 2023 10:10:50 +0000 Subject: [PATCH 14/24] deprecation warning Signed-off-by: James Milligan --- cmd/start.go | 10 ++++++++-- docs/configuration/flagd_start.md | 4 ++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 19c282c01..702d3cfa6 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -46,7 +46,7 @@ func init() { flags.StringP(serverCertPathFlagName, "c", "", "Server side tls certificate path") flags.StringP(serverKeyPathFlagName, "k", "", "Server side tls key path") flags.StringToStringP(providerArgsFlagName, - "a", nil, "Sync provider arguments as key values separated by =") + "a", nil, "DEPRECATED: Sync provider arguments as key values separated by =") flags.StringSliceP( uriFlagName, "f", []string{}, "Set a sync provider uri to read data from, this can be a filepath,"+ "url (http and grpc) or FeatureFlagConfiguration. When flag keys are duplicated across multiple providers the "+ @@ -62,7 +62,8 @@ func init() { ) flags.StringP( syncProvidersFlagName, "s", "", "JSON representation of an array of ProviderConfig objects. This object contains "+ - "2 required fields, uri (string) and provider (string). Documentation for this object can be found here: ", + "2 required fields, uri (string) and provider (string). Documentation for this object can be found here: "+ + "https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md#sync-provider-customisation", ) flags.StringP(logFormatFlagName, "z", "console", "Set the logging format, e.g. console or json ") @@ -109,6 +110,11 @@ var startCmd = &cobra.Command{ "Docs: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md") } + if viper.GetStringMapString(providerArgsFlagName) != nil { + rtLogger.Warn("DEPRECATED: The --sync-provider-args flag has been deprecated. " + + "Docs: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md") + } + syncProviders, err := runtime.SyncProvidersFromURIs(viper.GetStringSlice(uriFlagName)) if err != nil { log.Fatal(err) diff --git a/docs/configuration/flagd_start.md b/docs/configuration/flagd_start.md index 95770ff30..3fe9e21ae 100644 --- a/docs/configuration/flagd_start.md +++ b/docs/configuration/flagd_start.md @@ -20,8 +20,8 @@ flagd start [flags] -k, --server-key-path string Server side tls key path -d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally. -y, --sync-provider string DEPRECATED: Set a sync provider e.g. filepath or remote - -a, --sync-provider-args stringToString Sync provider arguments as key values separated by = (default []) - -s, --sync-providers string JSON representation of an array of ProviderConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object can be found here: + -a, --sync-provider-args stringToString DEPRECATED: Sync provider arguments as key values separated by = (default []) + -s, --sync-providers string JSON representation of an array of ProviderConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object can be found here: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md#sync-provider-customisation -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url (http and grpc) or FeatureFlagConfiguration. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. ``` From 1f7f7aec31c358230576f76502befd0783386e50 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 7 Mar 2023 16:41:26 +0000 Subject: [PATCH 15/24] removed config object from syncs Signed-off-by: James Milligan --- pkg/runtime/from_config.go | 10 ++++------ pkg/sync/file/filepath_sync.go | 1 - pkg/sync/grpc/grpc_sync.go | 1 - pkg/sync/http/http_sync.go | 6 +++--- pkg/sync/http/http_sync_test.go | 16 ++++++---------- pkg/sync/kubernetes/kubernetes_sync.go | 1 - 6 files changed, 13 insertions(+), 22 deletions(-) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 0d38cdfd7..6b75016f1 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -135,8 +135,8 @@ func (r *Runtime) newHTTP(config sync.ProviderConfig, logger *logger.Logger) *ht zap.String("component", "sync"), zap.String("sync", "remote"), ), - Config: config, - Cron: cron.New(), + BearerToken: config.BearerToken, + Cron: cron.New(), } } @@ -146,8 +146,7 @@ func (r *Runtime) newK8s(config sync.ProviderConfig, logger *logger.Logger) *kub zap.String("component", "sync"), zap.String("sync", "kubernetes"), ), - URI: config.URI, - Config: config, + URI: config.URI, } } @@ -158,8 +157,7 @@ func (r *Runtime) newFile(config sync.ProviderConfig, logger *logger.Logger) *fi zap.String("component", "sync"), zap.String("sync", "filepath"), ), - Config: config, - Mux: &msync.RWMutex{}, + Mux: &msync.RWMutex{}, } } diff --git a/pkg/sync/file/filepath_sync.go b/pkg/sync/file/filepath_sync.go index 6cb02c52a..7ec703c33 100644 --- a/pkg/sync/file/filepath_sync.go +++ b/pkg/sync/file/filepath_sync.go @@ -19,7 +19,6 @@ import ( type Sync struct { URI string Logger *logger.Logger - Config sync.ProviderConfig Source string // FileType indicates the file type e.g., json, yaml/yml etc., fileType string diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 0241b014b..d1fb80641 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -39,7 +39,6 @@ type Sync struct { options []grpc.DialOption ready bool Mux *msync.RWMutex - Config sync.ProviderConfig } func (g *Sync) connectClient(ctx context.Context) error { diff --git a/pkg/sync/http/http_sync.go b/pkg/sync/http/http_sync.go index c3c59cc93..9f7ea3174 100644 --- a/pkg/sync/http/http_sync.go +++ b/pkg/sync/http/http_sync.go @@ -22,7 +22,7 @@ type Sync struct { LastBodySHA string Logger *logger.Logger ready bool - Config sync.ProviderConfig + BearerToken string } // Client defines the behaviour required of a http client @@ -116,8 +116,8 @@ func (hs *Sync) fetchBodyFromURL(ctx context.Context, url string) ([]byte, error req.Header.Add("Accept", "application/json") - if hs.Config.BearerToken != "" { - bearer := fmt.Sprintf("Bearer %s", hs.Config.BearerToken) + if hs.BearerToken != "" { + bearer := fmt.Sprintf("Bearer %s", hs.BearerToken) req.Header.Set("Authorization", bearer) } diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go index 187fbdbf2..dc1c991cd 100644 --- a/pkg/sync/http/http_sync_test.go +++ b/pkg/sync/http/http_sync_test.go @@ -147,11 +147,9 @@ func TestHTTPSync_Fetch(t *testing.T) { tt.setup(t, mockClient) httpSync := Sync{ - URI: tt.uri, - Client: mockClient, - Config: sync.ProviderConfig{ - BearerToken: tt.bearerToken, - }, + URI: tt.uri, + Client: mockClient, + BearerToken: tt.bearerToken, LastBodySHA: tt.lastBodySHA, Logger: logger.NewLogger(nil, false), } @@ -223,11 +221,9 @@ func TestHTTPSync_Resync(t *testing.T) { tt.setup(t, mockClient) httpSync := Sync{ - URI: tt.uri, - Client: mockClient, - Config: sync.ProviderConfig{ - BearerToken: tt.bearerToken, - }, + URI: tt.uri, + Client: mockClient, + BearerToken: tt.bearerToken, LastBodySHA: tt.lastBodySHA, Logger: logger.NewLogger(nil, false), } diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index a406f0458..179889c02 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -27,7 +27,6 @@ var resyncPeriod = 1 * time.Minute type Sync struct { Logger *logger.Logger - Config sync.ProviderConfig client client.Client URI string ready bool From a522f566c8cc66bf78873f78c29d8a374596021b Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 7 Mar 2023 16:50:59 +0000 Subject: [PATCH 16/24] rename sync providers to sources Signed-off-by: James Milligan --- cmd/start.go | 12 ++++++------ docs/configuration/configuration.md | 8 ++++---- docs/configuration/flagd_start.md | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 702d3cfa6..a03623ada 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -25,7 +25,7 @@ const ( serverCertPathFlagName = "server-cert-path" serverKeyPathFlagName = "server-key-path" socketPathFlagName = "socket-path" - syncProvidersFlagName = "sync-providers" + sourcesFlagName = "sources" syncProviderFlagName = "sync-provider" uriFlagName = "uri" ) @@ -61,7 +61,7 @@ func init() { syncProviderFlagName, "y", "", "DEPRECATED: Set a sync provider e.g. filepath or remote", ) flags.StringP( - syncProvidersFlagName, "s", "", "JSON representation of an array of ProviderConfig objects. This object contains "+ + sourcesFlagName, "s", "", "JSON representation of an array of ProviderConfig objects. This object contains "+ "2 required fields, uri (string) and provider (string). Documentation for this object can be found here: "+ "https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md#sync-provider-customisation", ) @@ -78,7 +78,7 @@ func init() { _ = viper.BindPFlag(serverKeyPathFlagName, flags.Lookup(serverKeyPathFlagName)) _ = viper.BindPFlag(socketPathFlagName, flags.Lookup(socketPathFlagName)) _ = viper.BindPFlag(syncProviderFlagName, flags.Lookup(syncProviderFlagName)) - _ = viper.BindPFlag(syncProvidersFlagName, flags.Lookup(syncProvidersFlagName)) + _ = viper.BindPFlag(sourcesFlagName, flags.Lookup(sourcesFlagName)) _ = viper.BindPFlag(uriFlagName, flags.Lookup(uriFlagName)) } @@ -121,13 +121,13 @@ var startCmd = &cobra.Command{ } syncProvidersFromConfig := []sync.ProviderConfig{} - if cfgFile == "" && viper.GetString(syncProvidersFlagName) != "" { - syncProvidersFromConfig, err = runtime.SyncProviderArgPass(viper.GetString(syncProvidersFlagName)) + if cfgFile == "" && viper.GetString(sourcesFlagName) != "" { + syncProvidersFromConfig, err = runtime.SyncProviderArgPass(viper.GetString(sourcesFlagName)) if err != nil { log.Fatal(err) } } else { - err = viper.UnmarshalKey(syncProvidersFlagName, &syncProvidersFromConfig) + err = viper.UnmarshalKey(sourcesFlagName, &syncProvidersFromConfig) if err != nil { log.Fatal(err) } diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 087f81819..a12cb7516 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -36,9 +36,9 @@ To use an existing FeatureFlagConfiguration custom resource, start flagD with th flagd start --uri core.openfeature.dev/default/my_example ``` -### Sync Provider Configuration +### Source Configuration -While a URI may be passed to flagd via the `--uri` flag, some implementations may require further configurations. In these cases the `--sync-providers` flag should be used. +While a URI may be passed to flagd via the `--uri` flag, some implementations may require further configurations. In these cases the `--sources` flag should be used. The flag takes a string argument, which should be a JSON representation of an array of `ProviderConfig` objects. Alternatively, these configurations should be passed to flagd via config file, specified using the `--config` flag. @@ -52,11 +52,11 @@ The `uri` field values do not need to follow the [URI patterns](#uri-patterns), Example start command using a filepath sync provider and the equivalent config file definition: ```sh -./flagd start --sync-providers=\[{\"uri\":\"config/samples/example_flags.json\"\,\"provider\":\"file\"}\] +./flagd start --sources=\[{\"uri\":\"config/samples/example_flags.json\"\,\"provider\":\"file\"}\] ``` ```yaml -sync-providers: +sources: - uri: config/samples/example_flags.json provider: file ``` \ No newline at end of file diff --git a/docs/configuration/flagd_start.md b/docs/configuration/flagd_start.md index 3fe9e21ae..369ab46f3 100644 --- a/docs/configuration/flagd_start.md +++ b/docs/configuration/flagd_start.md @@ -19,9 +19,9 @@ flagd start [flags] -c, --server-cert-path string Server side tls certificate path -k, --server-key-path string Server side tls key path -d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally. + -s, --sources string JSON representation of an array of ProviderConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object can be found here: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md#sync-provider-customisation -y, --sync-provider string DEPRECATED: Set a sync provider e.g. filepath or remote -a, --sync-provider-args stringToString DEPRECATED: Sync provider arguments as key values separated by = (default []) - -s, --sync-providers string JSON representation of an array of ProviderConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object can be found here: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md#sync-provider-customisation -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url (http and grpc) or FeatureFlagConfiguration. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. ``` From 5019af86027fa5166b3e9583f18f56ce1a72ffe0 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 7 Mar 2023 16:53:29 +0000 Subject: [PATCH 17/24] reintroduce deprecation of --sync-provider Signed-off-by: James Milligan --- cmd/start.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/start.go b/cmd/start.go index a03623ada..086352a32 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -105,6 +105,11 @@ var startCmd = &cobra.Command{ rtLogger.Info(fmt.Sprintf("flagd version: %s (%s), built at: %s", Version, Commit, Date)) + if viper.GetString(syncProviderFlagName) != "" { + rtLogger.Warn("DEPRECATED: The --sync-provider flag has been deprecated. " + + "Docs: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md") + } + if viper.GetString(evaluatorFlagName) != "json" { rtLogger.Warn("DEPRECATED: The --evaluator flag has been deprecated. " + "Docs: https://github.com/open-feature/flagd/blob/main/docs/configuration/configuration.md") From 33644c33f42b7080c1e357020df7448b76f54c53 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 7 Mar 2023 16:54:34 +0000 Subject: [PATCH 18/24] rename func Signed-off-by: James Milligan --- cmd/start.go | 2 +- pkg/runtime/from_config.go | 2 +- pkg/runtime/runtime_test.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 086352a32..8d1188bf8 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -127,7 +127,7 @@ var startCmd = &cobra.Command{ syncProvidersFromConfig := []sync.ProviderConfig{} if cfgFile == "" && viper.GetString(sourcesFlagName) != "" { - syncProvidersFromConfig, err = runtime.SyncProviderArgPass(viper.GetString(sourcesFlagName)) + syncProvidersFromConfig, err = runtime.SyncProviderArgParse(viper.GetString(sourcesFlagName)) if err != nil { log.Fatal(err) } diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 6b75016f1..849533439 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -161,7 +161,7 @@ func (r *Runtime) newFile(config sync.ProviderConfig, logger *logger.Logger) *fi } } -func SyncProviderArgPass(syncProviders string) ([]sync.ProviderConfig, error) { +func SyncProviderArgParse(syncProviders string) ([]sync.ProviderConfig, error) { syncProvidersParsed := []sync.ProviderConfig{} if err := json.Unmarshal([]byte(syncProviders), &syncProvidersParsed); err != nil { return syncProvidersParsed, fmt.Errorf("unable to parse sync providers: %w", err) diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index f1321a985..8334d62a0 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -8,7 +8,7 @@ import ( "github.com/open-feature/flagd/pkg/sync" ) -func TestSyncProviderArgPass(t *testing.T) { +func TestSyncProviderArgParse(t *testing.T) { test := map[string]struct { in string expectErr bool @@ -66,7 +66,7 @@ func TestSyncProviderArgPass(t *testing.T) { for name, tt := range test { t.Run(name, func(t *testing.T) { - out, err := runtime.SyncProviderArgPass(tt.in) + out, err := runtime.SyncProviderArgParse(tt.in) if tt.expectErr { if err == nil { t.Error("expected error, got none") From d7901ff83d59f0128f702bd2e47334e29ada7986 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 28 Feb 2023 10:46:40 +0000 Subject: [PATCH 19/24] wip Signed-off-by: James Milligan --- pkg/sync/grpc/grpc_sync.go | 1 + pkg/sync/http/http_sync_test.go | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 892780c29..d2c937388 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -40,6 +40,7 @@ type Sync struct { options []grpc.DialOption ready bool Mux *msync.RWMutex + Config sync.SyncProviderConfig } func (g *Sync) connectClient(ctx context.Context) error { diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go index d7e9c647b..0c683d240 100644 --- a/pkg/sync/http/http_sync_test.go +++ b/pkg/sync/http/http_sync_test.go @@ -147,9 +147,11 @@ func TestHTTPSync_Fetch(t *testing.T) { tt.setup(t, mockClient) httpSync := Sync{ - URI: tt.uri, - Client: mockClient, - BearerToken: tt.bearerToken, + URI: tt.uri, + Client: mockClient, + Config: sync.SyncProviderConfig{ + BearerToken: tt.bearerToken, + }, LastBodySHA: tt.lastBodySHA, Logger: logger.NewLogger(nil, false), } From dbc886e2dc9cf30e0988939aa8844c4e72d121e8 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Tue, 28 Feb 2023 15:53:11 +0000 Subject: [PATCH 20/24] linting Signed-off-by: James Milligan --- pkg/sync/grpc/grpc_sync.go | 2 +- pkg/sync/http/http_sync_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index d2c937388..106b81256 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -40,7 +40,7 @@ type Sync struct { options []grpc.DialOption ready bool Mux *msync.RWMutex - Config sync.SyncProviderConfig + Config sync.ProviderConfig } func (g *Sync) connectClient(ctx context.Context) error { diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go index 0c683d240..9b5ff9596 100644 --- a/pkg/sync/http/http_sync_test.go +++ b/pkg/sync/http/http_sync_test.go @@ -149,7 +149,7 @@ func TestHTTPSync_Fetch(t *testing.T) { httpSync := Sync{ URI: tt.uri, Client: mockClient, - Config: sync.SyncProviderConfig{ + Config: sync.ProviderConfig{ BearerToken: tt.bearerToken, }, LastBodySHA: tt.lastBodySHA, From 09c6d138691ba775e520ce2abeeaf04a43284a96 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Mon, 6 Mar 2023 10:48:21 -0800 Subject: [PATCH 21/24] init grpc tls con Signed-off-by: Kavindu Dodanduwa --- pkg/runtime/from_config.go | 2 +- pkg/sync/grpc/grpc_sync.go | 137 +++++++++----- pkg/sync/grpc/grpc_sync_test.go | 319 ++++++++++++++++++++++++++++---- 3 files changed, 375 insertions(+), 83 deletions(-) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 849533439..5891049ed 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -116,7 +116,7 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { func (r *Runtime) newGRPC(config sync.ProviderConfig, logger *logger.Logger) *grpc.Sync { return &grpc.Sync{ - Target: grpc.URLToGRPCTarget(config.URI), + URI: config.URI, Logger: logger.WithFields( zap.String("component", "sync"), zap.String("sync", "grpc"), diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 106b81256..c16fd6531 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -2,12 +2,17 @@ package grpc import ( "context" + "crypto/tls" + "crypto/x509" "fmt" "math" + "os" "strings" msync "sync" "time" + "google.golang.org/grpc/credentials" + "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" @@ -18,9 +23,10 @@ import ( ) const ( - // Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate - // remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints. - Prefix = "grpc://" + // Prefix for GRPC URL inputs. GRPC does not define a standard prefix. This prefix helps to differentiate remote + // URLs for REST APIs (i.e - HTTP) from GRPC endpoints. + Prefix = "grpc://" + PrefixSecure = "grpcs://" // Connection retry constants // Back off period is calculated with backOffBase ^ #retry-iteration. However, when #retry-iteration count reach @@ -28,36 +34,44 @@ const ( backOffLimit = 3 backOffBase = 4 constantBackOffDelay = 60 + + tlsVersion = tls.VersionTLS12 ) type Sync struct { - Target string + URI string ProviderID string + CertPath string Logger *logger.Logger - - syncClient syncv1grpc.FlagSyncService_SyncFlagsClient - client syncv1grpc.FlagSyncServiceClient - options []grpc.DialOption - ready bool Mux *msync.RWMutex Config sync.ProviderConfig + + client syncv1grpc.FlagSyncServiceClient + ready bool } -func (g *Sync) connectClient(ctx context.Context) error { - // initial dial and connection. Failure here must result in a startup failure - dial, err := grpc.DialContext(ctx, g.Target, g.options...) +func (g *Sync) Init(ctx context.Context) error { + tCredentials, err := buildTransportCredentials(g.URI, g.CertPath) if err != nil { + g.Logger.Error(fmt.Sprintf("error building transport credentials: %s", err.Error())) return err } - g.client = syncv1grpc.NewFlagSyncServiceClient(dial) + target, ok := sourceToGRPCTarget(g.URI) + if !ok { + return fmt.Errorf("invalid grpc source: %s", g.URI) + } - syncClient, err := g.client.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) + // Derive reusable client connection + rpcCon, err := grpc.DialContext(ctx, target, grpc.WithTransportCredentials(tCredentials)) if err != nil { - g.Logger.Error(fmt.Sprintf("error calling streaming operation: %s", err.Error())) + g.Logger.Error(fmt.Sprintf("error initiating grpc client connection: %s", err.Error())) return err } - g.syncClient = syncClient + + // Setup service client + g.client = syncv1grpc.NewFlagSyncServiceClient(rpcCon) + return nil } @@ -69,21 +83,12 @@ func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error } dataSync <- sync.DataSync{ FlagData: res.GetFlagConfiguration(), - Source: g.Target, + Source: g.URI, Type: sync.ALL, } return nil } -func (g *Sync) Init(ctx context.Context) error { - g.options = []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - } - - // initial dial and connection. Failure here must result in a startup failure - return g.connectClient(ctx) -} - func (g *Sync) IsReady() bool { g.Mux.RLock() defer g.Mux.RUnlock() @@ -99,7 +104,13 @@ func (g *Sync) setReady(val bool) { func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // initial stream listening g.setReady(true) - err := g.handleFlagSync(g.syncClient, dataSync) + + syncClient, err := g.client.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) + if err != nil { + return nil + } + + err = g.handleFlagSync(syncClient, dataSync) if err == nil { return nil } @@ -149,12 +160,7 @@ func (g *Sync) connectWithRetry( return nil, false } - g.Logger.Warn(fmt.Sprintf("connection re-establishment attempt in-progress for grpc target: %s", g.Target)) - - if err := g.connectClient(ctx); err != nil { - g.Logger.Debug(fmt.Sprintf("error dialing target: %s", err.Error())) - continue - } + g.Logger.Warn(fmt.Sprintf("connection re-establishment attempt in-progress for grpc target: %s", g.URI)) syncClient, err := g.client.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) if err != nil { @@ -162,7 +168,7 @@ func (g *Sync) connectWithRetry( continue } - g.Logger.Info(fmt.Sprintf("connection re-established with grpc target: %s", g.Target)) + g.Logger.Info(fmt.Sprintf("connection re-established with grpc target: %s", g.URI)) return syncClient, true } } @@ -179,7 +185,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, case v1.SyncState_SYNC_STATE_ALL: dataSync <- sync.DataSync{ FlagData: data.FlagConfiguration, - Source: g.Target, + Source: g.URI, Type: sync.ALL, } @@ -187,7 +193,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, case v1.SyncState_SYNC_STATE_ADD: dataSync <- sync.DataSync{ FlagData: data.FlagConfiguration, - Source: g.Target, + Source: g.URI, Type: sync.ADD, } @@ -195,7 +201,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, case v1.SyncState_SYNC_STATE_UPDATE: dataSync <- sync.DataSync{ FlagData: data.FlagConfiguration, - Source: g.Target, + Source: g.URI, Type: sync.UPDATE, } @@ -203,7 +209,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, case v1.SyncState_SYNC_STATE_DELETE: dataSync <- sync.DataSync{ FlagData: data.FlagConfiguration, - Source: g.Target, + Source: g.URI, Type: sync.DELETE, } @@ -216,14 +222,57 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, } } -// URLToGRPCTarget is a helper to derive GRPC target from a provided URL +// buildTransportCredentials is a helper to build grpc credentials.TransportCredentials based on source and cert path +func buildTransportCredentials(source string, certPath string) (credentials.TransportCredentials, error) { + if strings.Contains(source, Prefix) { + return insecure.NewCredentials(), nil + } + + if !strings.Contains(source, PrefixSecure) { + return nil, fmt.Errorf("invalid source. grpc source must contain prefix %s or %s", Prefix, PrefixSecure) + } + + if certPath == "" { + // Rely on CA certs provided from system + return credentials.NewTLS(&tls.Config{MinVersion: tlsVersion}), nil + } + + // Rely on provided certificate + certBytes, err := os.ReadFile(certPath) + if err != nil { + return nil, err + } + + cp := x509.NewCertPool() + if !cp.AppendCertsFromPEM(certBytes) { + return nil, fmt.Errorf("invalid certificate provided at path: %s", certPath) + } + + return credentials.NewTLS(&tls.Config{ + MinVersion: tlsVersion, + RootCAs: cp, + }), nil +} + +// sourceToGRPCTarget is a helper to derive GRPC target from a provided URL // For example, function returns the target localhost:9090 for the input grpc://localhost:9090 -func URLToGRPCTarget(url string) string { - index := strings.Split(url, Prefix) +func sourceToGRPCTarget(url string) (string, bool) { + var separator string + + switch { + case strings.Contains(url, Prefix): + separator = Prefix + case strings.Contains(url, PrefixSecure): + separator = PrefixSecure + default: + return "", false + } + + index := strings.Split(url, separator) - if len(index) == 2 { - return index[1] + if len(index) == 2 && len(index[1]) != 0 { + return index[1], true } - return index[0] + return "", false } diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 0c7e84ba8..f35da07d8 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -7,8 +7,10 @@ import ( "io" "log" "net" + "os" msync "sync" "testing" + "time" "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" @@ -20,6 +22,34 @@ import ( "google.golang.org/grpc/test/bufconn" ) +const sampleCert = `-----BEGIN CERTIFICATE----- +MIIEnDCCAoQCCQCHcl3hGXwRQzANBgkqhkiG9w0BAQsFADAQMQ4wDAYDVQQDDAVm +bGFnZDAeFw0yMzAyMTAxODM1NDVaFw0zMzAyMDcxODM1NDVaMBAxDjAMBgNVBAMM +BWZsYWdkMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAwDLEAUti/kG9 +MhJLtO7oAy7diHxWKDFmsIHrE+z2IzTxjXxVHQLv1HiYB/UN75y7qlb3MwvzSc+C +BoLuoiM0PDiMio9/o9X5j0U+v3H1JpUU5LardkvsprFqJWmHF+D7aRdM0LBLn2X6 +HQOhSnPyH9Qjl2l2tyPiPTZ6g0i2+rXZsNUoTs4fm6ThhZ0LeXR8KDmCTun3ze1d +hXA7ydxwILH2OVc+Wnzl30+BRvOiLQbc9nYnwSREFeIy8sFbhrTHqSNn3eY79ssZ +T6f4tN3jEV1d7NqoFk9KFLJKJhMt7smMB9NLwVWi581Zj1krYirNlP6mtmPrn3kJ +lsgT15kFftShMVcYFSHqOSLiy4SspHGK8KJaFoEVx0wp/weRwrWXi6vWg7tuHATH +fw7gW/9CyV+ylc0pJ002wtPAgzJYUaOrna0R2r3yQsSzRcDnqsm4FLkPHLoyjrwQ +vshKcEqjhGml1M+lTDEo3RO5ZoQ3ZN2AZKPDrK2zGG4wFJjHRu9FtutOEZkYYOzA +emTQWW8US3q8WVQqGl/EwQqzXk9Lco7uhLdXmqVOvAi6z01gehQJPnjhH7iqAPVp +1tlOBHit1F3sTAQIO/2zff3LCKiD2d27KINh4aFEyDbDmglPA8VPO3BMQVSjFlxj +K1s2G1IDBixXK76VmBP+ZpvxOaQtYIUCAwEAATANBgkqhkiG9w0BAQsFAAOCAgEA +K9+wnl5gpkfNBa+OSxlhOn3CKhcaW/SWZ4aLw2yK1NZNnNjpwUcLQScUDBKDoJJR +5roc3PIImX7hdnobZWqFhD23laaAlu5XLk9P7n51uMEiNjQQc2WaaBZDTRJfki1C +MvPskXqptgPsVyuPJc0DxfaCz7pDYjq/CtJ+osaj404P5mlO1QJ8W91QSx+aq2x4 +uUTUWuyr/8flIcxiX0o8VTb2LcUvWZBMGa3CdeLnPHrOjovfjJFy0Ysk3SGEACLL +9mpbNbv23v9UXVfyFffHpyzvyUJIOsNXG0O1AYf5t9bukqHolGR/RQUN4yGd3M62 +mFR5bOST36DjNSzTrx1eyCLv22+h9VVlWFPrebFnq1W5SSi8PtsGSMjhvX7dB1kS +t0yJtlj2HwBAvI1zVKG76q6neSU51UXFQUbO0OA0sxjicEOlNfXnShM/kY2lobpX +hrCysWpqoSS0S3UBvmuRiraLWkP1KueC0XHoAi8yuwMAdM6Y+h2OJpnO0PdpUmrp +lAqdxbyICnB1Nsm5QGGm6Pxd8lEbQ9ZSwFjgqApjT2zVhuaaUC7jdlEP1H5snt9n +8FQR06lrzGyW04ud9pd6MXJup1oghAlvnzXioAH2Az0IXcHvqUGZQattFv27OXqj +QZ6ayNO119SNscvC6Qe9GLlbBEHDQWKPiftnS2Mh6Do= +-----END CERTIFICATE-----` + func Test_ReSyncTests(t *testing.T) { const target = "localBufCon" @@ -77,7 +107,7 @@ func Test_ReSyncTests(t *testing.T) { c := syncv1grpc.NewFlagSyncServiceClient(dial) grpcSync := Sync{ - Target: target, + URI: target, ProviderID: "", Logger: logger.NewLogger(nil, false), client: c, @@ -111,32 +141,60 @@ func Test_ReSyncTests(t *testing.T) { } } -func TestUrlToGRPCTarget(t *testing.T) { +func TestSourceToGRPCTarget(t *testing.T) { tests := []struct { name string url string want string + ok bool }{ { name: "With Prefix", url: "grpc://test.com/endpoint", want: "test.com/endpoint", + ok: true, }, { - name: "Without Prefix", - url: "test.com/endpoint", + name: "With secure Prefix", + url: "grpcs://test.com/endpoint", want: "test.com/endpoint", + ok: true, }, { - name: "Empty is empty", + name: "Empty is error", url: "", want: "", + ok: false, + }, + { + name: "Invalid is error", + url: "https://test.com/endpoint", + want: "", + ok: false, + }, + { + name: "Prefix is not enough I", + url: Prefix, + want: "", + ok: false, + }, + { + name: "Prefix is not enough II", + url: PrefixSecure, + want: "", + ok: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := URLToGRPCTarget(tt.url); got != tt.want { - t.Errorf("URLToGRPCTarget() = %v, want %v", got, tt.want) + got, ok := sourceToGRPCTarget(tt.url) + + if tt.ok != ok { + t.Errorf("URLToGRPCTarget() returned = %v, want %v", ok, tt.ok) + } + + if got != tt.want { + t.Errorf("URLToGRPCTarget() returned = %v, want %v", got, tt.want) } }) } @@ -144,7 +202,7 @@ func TestUrlToGRPCTarget(t *testing.T) { func TestSync_BasicFlagSyncStates(t *testing.T) { grpcSyncImpl := Sync{ - Target: "grpc://test", + URI: "grpc://test", ProviderID: "", Logger: logger.NewLogger(nil, false), Mux: &msync.RWMutex{}, @@ -152,16 +210,18 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { tests := []struct { name string - stream syncv1grpc.FlagSyncService_SyncFlagsClient + stream syncv1grpc.FlagSyncServiceClient want sync.Type ready bool }{ { name: "State All maps to Sync All", - stream: &SimpleRecvMock{ - mockResponse: v1.SyncFlagsResponse{ - FlagConfiguration: "{}", - State: v1.SyncState_SYNC_STATE_ALL, + stream: &MockServiceClient{ + mockStream: SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_ALL, + }, }, }, want: sync.ALL, @@ -169,10 +229,12 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { }, { name: "State Add maps to Sync Add", - stream: &SimpleRecvMock{ - mockResponse: v1.SyncFlagsResponse{ - FlagConfiguration: "{}", - State: v1.SyncState_SYNC_STATE_ADD, + stream: &MockServiceClient{ + mockStream: SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_ADD, + }, }, }, want: sync.ADD, @@ -180,10 +242,12 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { }, { name: "State Update maps to Sync Update", - stream: &SimpleRecvMock{ - mockResponse: v1.SyncFlagsResponse{ - FlagConfiguration: "{}", - State: v1.SyncState_SYNC_STATE_UPDATE, + stream: &MockServiceClient{ + mockStream: SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_UPDATE, + }, }, }, want: sync.UPDATE, @@ -191,10 +255,12 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { }, { name: "State Delete maps to Sync Delete", - stream: &SimpleRecvMock{ - mockResponse: v1.SyncFlagsResponse{ - FlagConfiguration: "{}", - State: v1.SyncState_SYNC_STATE_DELETE, + stream: &MockServiceClient{ + mockStream: SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_DELETE, + }, }, }, want: sync.DELETE, @@ -207,12 +273,13 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { syncChan := make(chan sync.DataSync) go func() { - grpcSyncImpl.syncClient = test.stream + grpcSyncImpl.client = test.stream err := grpcSyncImpl.Sync(context.TODO(), syncChan) if err != nil { t.Errorf("Error handling flag sync: %s", err.Error()) } }() + data := <-syncChan if grpcSyncImpl.IsReady() != test.ready { @@ -331,13 +398,6 @@ func Test_StreamListener(t *testing.T) { // start server go serve(&bufServer) - grpcSync := Sync{ - Target: target, - ProviderID: "", - Logger: logger.NewLogger(nil, false), - Mux: &msync.RWMutex{}, - } - // initialize client dial, err := grpc.Dial(target, grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { @@ -349,16 +409,20 @@ func Test_StreamListener(t *testing.T) { } serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) - syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: grpcSync.ProviderID}) - if err != nil { - t.Errorf("Error opening client stream: %s", err.Error()) + + grpcSync := Sync{ + URI: target, + ProviderID: "", + Logger: logger.NewLogger(nil, false), + Mux: &msync.RWMutex{}, + + client: serviceClient, } syncChan := make(chan sync.DataSync, 1) // listen to stream go func() { - grpcSync.syncClient = syncClient err := grpcSync.Sync(context.TODO(), syncChan) if err != nil { // must ignore EOF as this is returned for stream end @@ -387,8 +451,187 @@ func Test_StreamListener(t *testing.T) { } } +func Test_BuildTCredentials(t *testing.T) { + // "insecure" is a hardcoded term at insecure.NewCredentials + const insecure = "insecure" + // "tls" is a hardcoded term at tlsCreds.Info + const tls = "tls" + // local test file with valid certificate + const validCertFile = "valid.cert" + // local test file with invalid certificate + const invalidCertFile = "invalid.cert" + + // init cert files for tests & cleanup with a deffer + err := os.WriteFile(validCertFile, []byte(sampleCert), 0o600) + if err != nil { + t.Errorf("error creating valid certificate file: %s", err) + } + + err = os.WriteFile(invalidCertFile, []byte("--certificate--"), 0o600) + if err != nil { + t.Errorf("error creating invalid certificate file: %s", err) + } + + defer func() { + errV := os.Remove(validCertFile) + errI := os.Remove(invalidCertFile) + if errV != nil || errI != nil { + t.Errorf("error removing cerificate files: %v, %v", errV, errI) + } + }() + + tests := []struct { + name string + source string + certPath string + expectSecProto string + error bool + }{ + { + name: "Insecure source results in insecure connection", + source: Prefix + "some.domain", + certPath: "", + expectSecProto: insecure, + }, + { + name: "Secure source results in secure connection", + source: PrefixSecure + "some.domain", + certPath: validCertFile, + expectSecProto: tls, + }, + { + name: "Secure source with no certificate results in a secure connection", + source: PrefixSecure + "some.domain", + expectSecProto: tls, + }, + { + name: "Invalid cert path results in an error", + source: PrefixSecure + "some.domain", + certPath: "invalid/path", + error: true, + }, + { + name: "Invalid certificate results in an error", + source: PrefixSecure + "some.domain", + certPath: invalidCertFile, + error: true, + }, + { + name: "Invalid prefix results in an error", + source: "http://some.domain", + error: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tCred, err := buildTransportCredentials(test.source, test.certPath) + + if test.error { + if err == nil { + t.Errorf("test expected non error execution. But resulted in an error: %s", err.Error()) + } + + // Test expected an error. Nothing to validate further + return + } + + // check for errors to be certain + if err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } + + protoc := tCred.Info().SecurityProtocol + if protoc != test.expectSecProto { + t.Errorf("buildTransportCredentials() returned protocol= %v, want %v", protoc, test.expectSecProto) + } + }) + } +} + +// Test_ConnectWithRetry is an attempt to validate grpc.connectWithRetry behavior +func Test_ConnectWithRetry(t *testing.T) { + target := "grpc://local" + bufListener := bufconn.Listen(1) + // buffer based server. response ignored purposefully + bServer := bufferedServer{listener: bufListener} + + // generate a client connection backed with bufconn + clientConn, err := grpc.Dial(target, + grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return bufListener.DialContext(ctx) + }), + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Errorf("error initiating the connection: %s", err.Error()) + } + + // minimal sync provider + grpcSync := Sync{ + Logger: logger.NewLogger(nil, false), + client: syncv1grpc.NewFlagSyncServiceClient(clientConn), + } + + // test must complete within an acceptable timeframe + tCtx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelFunc() + + // channel for connection + clientChan := make(chan syncv1grpc.FlagSyncService_SyncFlagsClient) + + // start connection retry attempts + go func() { + client, ok := grpcSync.connectWithRetry(tCtx) + if !ok { + clientChan <- nil + } + + clientChan <- client + }() + + // Wait for retries in the background + select { + case <-time.After(2 * time.Second): + break + case <-tCtx.Done(): + // We should not reach this with correct test setup, but in case we do + cancelFunc() + t.Errorf("timeout occurred while waiting for conditions to fulfil") + } + + // start the server - fulfill connection after the wait + go serve(&bServer) + + // Wait for client connection + var con syncv1grpc.FlagSyncService_SyncFlagsClient + + select { + case con = <-clientChan: + break + case <-tCtx.Done(): + cancelFunc() + t.Errorf("timeout occurred while waiting for conditions to fulfil") + } + + if con == nil { + t.Errorf("received a nil value when expecting a non-nil return") + } +} + // Mock implementations +type MockServiceClient struct { + syncv1grpc.FlagSyncServiceClient + + mockStream SimpleRecvMock +} + +func (c *MockServiceClient) SyncFlags(_ context.Context, + _ *v1.SyncFlagsRequest, _ ...grpc.CallOption, +) (syncv1grpc.FlagSyncService_SyncFlagsClient, error) { + return &c.mockStream, nil +} + type SimpleRecvMock struct { grpc.ClientStream mockResponse v1.SyncFlagsResponse @@ -398,7 +641,7 @@ func (s *SimpleRecvMock) Recv() (*v1.SyncFlagsResponse, error) { return &s.mockResponse, nil } -// serve serves a bufferedServer +// serve serves a bufferedServer. This is a blocking call func serve(bServer *bufferedServer) { server := grpc.NewServer() From 6577362bef8435f3eec917af87444d89fd26b179 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 9 Mar 2023 08:22:33 -0800 Subject: [PATCH 22/24] resolve merge conflicts Signed-off-by: Kavindu Dodanduwa --- cmd/start.go | 2 +- docs/configuration/configuration.md | 24 ++++++++++-------- pkg/runtime/from_config.go | 39 ++++++++++++++++------------- pkg/runtime/runtime.go | 2 +- pkg/runtime/runtime_test.go | 20 +++++++-------- pkg/sync/grpc/grpc_sync.go | 4 +-- pkg/sync/grpc/grpc_sync_test.go | 2 -- pkg/sync/http/http_sync_test.go | 8 +++--- pkg/sync/isync.go | 3 ++- 9 files changed, 53 insertions(+), 51 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 2b1bf19ce..11c4b279c 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -125,7 +125,7 @@ var startCmd = &cobra.Command{ log.Fatal(err) } - syncProvidersFromConfig := []sync.SourceConfig{} + syncProvidersFromConfig := []sync.ProviderConfig{} if cfgFile == "" && viper.GetString(sourcesFlagName) != "" { syncProvidersFromConfig, err = runtime.SyncProviderArgParse(viper.GetString(sourcesFlagName)) if err != nil { diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 3a107b808..2bd254b90 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -14,12 +14,12 @@ Config file expects the keys to have the exact naming as the flags. Any URI passed to flagd via the `--uri` flag must follow one of the 4 following patterns to ensure that it is passed to the correct implementation: -| Sync | Pattern | Example | -|------------|------------------------------------|---------------------------------------| +| Sync | Pattern | Example | +|------------|---------------------------------------|---------------------------------------| | Kubernetes | `core.openfeature.dev/namespace/name` | `core.openfeature.dev/default/my-crd` | -| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` | -| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` | -| Grpc | `grpc://flag-source-url` | `grpc://my-flags-server` | +| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` | +| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` | +| Grpc | `grpc(s)://flag-source-url` | `grpc://my-flags-server` | ### Customising sync providers @@ -42,11 +42,12 @@ While a URI may be passed to flagd via the `--uri` flag, some implementations ma The flag takes a string argument, which should be a JSON representation of an array of `SourceConfig` objects. Alternatively, these configurations should be passed to flagd via config file, specified using the `--config` flag. -| Field | Type | -|------------|------------------------------------| -| uri | required `string` | | -| provider | required `string` (`file`, `kubernetes`, `http` or `grpc`) | -| bearerToken | optional `string` | +| Field | Type | Note | +|-------------|------------------------------------------------------------|----------------------------------------------------| +| uri | required `string` | | +| provider | required `string` (`file`, `kubernetes`, `http` or `grpc`) | | +| bearerToken | optional `string` | Used for http sync | +| certPath | optional `string` | Used for grpcs sync when TLS certificate is needed | The `uri` field values do not need to follow the [URI patterns](#uri-patterns), the provider type is instead derived from the provider field. If the prefix is supplied, it will be removed on startup without error. @@ -68,4 +69,7 @@ sources: provider: kubernetes - uri: grpc://my-flag-source:8080 provider: grpc +- uri: grpcs://my-flag-source:8080 + provider: grpc + certPath: /certs/ca.cert ``` diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index d5170003b..aad32f36f 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -30,16 +30,18 @@ const ( ) var ( - regCrd *regexp.Regexp - regURL *regexp.Regexp - regGRPC *regexp.Regexp - regFile *regexp.Regexp + regCrd *regexp.Regexp + regURL *regexp.Regexp + regGRPC *regexp.Regexp + regGRPCSecure *regexp.Regexp + regFile *regexp.Regexp ) func init() { regCrd = regexp.MustCompile("^core.openfeature.dev/") regURL = regexp.MustCompile("^https?://") regGRPC = regexp.MustCompile("^" + grpc.Prefix) + regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure) regFile = regexp.MustCompile("^file:") } @@ -118,17 +120,18 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { return nil } -func (r *Runtime) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync { +func (r *Runtime) newGRPC(config sync.ProviderConfig, logger *logger.Logger) *grpc.Sync { return &grpc.Sync{ - Target: grpc.URLToGRPCTarget(config.URI), + URI: config.URI, Logger: logger.WithFields( zap.String("component", "sync"), zap.String("sync", "grpc"), ), + CertPath: config.CertPath, } } -func (r *Runtime) newHTTP(config sync.SourceConfig, logger *logger.Logger) *httpSync.Sync { +func (r *Runtime) newHTTP(config sync.ProviderConfig, logger *logger.Logger) *httpSync.Sync { return &httpSync.Sync{ URI: config.URI, Client: &http.Client{ @@ -159,9 +162,9 @@ func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, e ), nil } -func (r *Runtime) newFile(config sync.SourceConfig, logger *logger.Logger) *file.Sync { +func (r *Runtime) newFile(config sync.ProviderConfig, logger *logger.Logger) *file.Sync { return &file.Sync{ - URI: regFile.ReplaceAllString(uri, ""), + URI: config.URI, Logger: logger.WithFields( zap.String("component", "sync"), zap.String("sync", "filepath"), @@ -170,8 +173,8 @@ func (r *Runtime) newFile(config sync.SourceConfig, logger *logger.Logger) *file } } -func SyncProviderArgParse(syncProviders string) ([]sync.SourceConfig, error) { - syncProvidersParsed := []sync.SourceConfig{} +func SyncProviderArgParse(syncProviders string) ([]sync.ProviderConfig, error) { + syncProvidersParsed := []sync.ProviderConfig{} if err := json.Unmarshal([]byte(syncProviders), &syncProvidersParsed); err != nil { return syncProvidersParsed, fmt.Errorf("unable to parse sync providers: %w", err) } @@ -192,27 +195,27 @@ func SyncProviderArgParse(syncProviders string) ([]sync.SourceConfig, error) { return syncProvidersParsed, nil } -func SyncProvidersFromURIs(uris []string) ([]sync.SourceConfig, error) { - syncProvidersParsed := []sync.SourceConfig{} +func SyncProvidersFromURIs(uris []string) ([]sync.ProviderConfig, error) { + syncProvidersParsed := []sync.ProviderConfig{} for _, uri := range uris { switch uriB := []byte(uri); { case regFile.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ + syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ URI: regFile.ReplaceAllString(uri, ""), Provider: syncProviderFile, }) case regCrd.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ + syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ URI: regCrd.ReplaceAllString(uri, ""), Provider: syncProviderKubernetes, }) case regURL.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ + syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ URI: uri, Provider: syncProviderHTTP, }) - case regGRPC.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ + case regGRPC.Match(uriB), regGRPCSecure.Match(uriB): + syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ URI: uri, Provider: syncProviderGrpc, }) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index d6d242dfc..0a18f4e7e 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -33,7 +33,7 @@ type Config struct { ServiceCertPath string ServiceKeyPath string - SyncProviders []sync.SourceConfig + SyncProviders []sync.ProviderConfig CORS []string } diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 84da5ebd0..8334d62a0 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -12,12 +12,12 @@ func TestSyncProviderArgParse(t *testing.T) { test := map[string]struct { in string expectErr bool - out []sync.SourceConfig + out []sync.ProviderConfig }{ "simple": { in: "[{\"uri\":\"config/samples/example_flags.json\",\"provider\":\"file\"}]", expectErr: false, - out: []sync.SourceConfig{ + out: []sync.ProviderConfig{ { URI: "config/samples/example_flags.json", Provider: "file", @@ -32,7 +32,7 @@ func TestSyncProviderArgParse(t *testing.T) { {"uri":"default/my-crd","provider":"kubernetes"} ]`, expectErr: false, - out: []sync.SourceConfig{ + out: []sync.ProviderConfig{ { URI: "config/samples/example_flags.json", Provider: "file", @@ -55,12 +55,12 @@ func TestSyncProviderArgParse(t *testing.T) { "empty": { in: `[]`, expectErr: false, - out: []sync.SourceConfig{}, + out: []sync.ProviderConfig{}, }, "parse-failure": { in: ``, expectErr: true, - out: []sync.SourceConfig{}, + out: []sync.ProviderConfig{}, }, } @@ -85,14 +85,14 @@ func TestSyncProvidersFromURIs(t *testing.T) { test := map[string]struct { in []string expectErr bool - out []sync.SourceConfig + out []sync.ProviderConfig }{ "simple": { in: []string{ "file:my-file.json", }, expectErr: false, - out: []sync.SourceConfig{ + out: []sync.ProviderConfig{ { URI: "my-file.json", Provider: "file", @@ -107,7 +107,7 @@ func TestSyncProvidersFromURIs(t *testing.T) { "core.openfeature.dev/default/my-crd", }, expectErr: false, - out: []sync.SourceConfig{ + out: []sync.ProviderConfig{ { URI: "my-file.json", Provider: "file", @@ -129,12 +129,12 @@ func TestSyncProvidersFromURIs(t *testing.T) { "empty": { in: []string{}, expectErr: false, - out: []sync.SourceConfig{}, + out: []sync.ProviderConfig{}, }, "parse-failure": { in: []string{"care.openfeature.dev/will/fail"}, expectErr: true, - out: []sync.SourceConfig{}, + out: []sync.ProviderConfig{}, }, } diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 10e422736..685427f34 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -45,8 +45,6 @@ type Sync struct { ProviderID string CertPath string Logger *logger.Logger - Mux *msync.RWMutex - Config sync.ProviderConfig client syncv1grpc.FlagSyncServiceClient ready bool @@ -98,7 +96,7 @@ func (g *Sync) IsReady() bool { func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { syncClient, err := g.client.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) if err != nil { - return nil + return err } // initial stream listening diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index a85e7db80..71f361ce5 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -8,7 +8,6 @@ import ( "log" "net" "os" - msync "sync" "testing" "time" @@ -413,7 +412,6 @@ func Test_StreamListener(t *testing.T) { URI: target, ProviderID: "", Logger: logger.NewLogger(nil, false), - Mux: &msync.RWMutex{}, client: serviceClient, } diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go index 359169f90..655de5a7c 100644 --- a/pkg/sync/http/http_sync_test.go +++ b/pkg/sync/http/http_sync_test.go @@ -142,11 +142,9 @@ func TestHTTPSync_Fetch(t *testing.T) { tt.setup(t, mockClient) httpSync := Sync{ - URI: tt.uri, - Client: mockClient, - Config: sync.ProviderConfig{ - BearerToken: tt.bearerToken, - }, + URI: tt.uri, + Client: mockClient, + BearerToken: tt.bearerToken, LastBodySHA: tt.lastBodySHA, Logger: logger.NewLogger(nil, false), } diff --git a/pkg/sync/isync.go b/pkg/sync/isync.go index 82ca6ee27..27f4ebb59 100644 --- a/pkg/sync/isync.go +++ b/pkg/sync/isync.go @@ -58,9 +58,10 @@ type DataSync struct { Type } -type SourceConfig struct { +type ProviderConfig struct { URI string `json:"uri"` Provider string `json:"provider"` BearerToken string `json:"bearerToken,omitempty"` + CertPath string `json:"certPath,omitempty"` } From a77be56f06cb8ea7f1ad8aa20973c4a30018b683 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 9 Mar 2023 08:28:18 -0800 Subject: [PATCH 23/24] fix naming Signed-off-by: Kavindu Dodanduwa --- cmd/start.go | 2 +- pkg/runtime/from_config.go | 22 +++++++++++----------- pkg/runtime/runtime.go | 2 +- pkg/runtime/runtime_test.go | 20 ++++++++++---------- pkg/sync/isync.go | 2 +- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 11c4b279c..2b1bf19ce 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -125,7 +125,7 @@ var startCmd = &cobra.Command{ log.Fatal(err) } - syncProvidersFromConfig := []sync.ProviderConfig{} + syncProvidersFromConfig := []sync.SourceConfig{} if cfgFile == "" && viper.GetString(sourcesFlagName) != "" { syncProvidersFromConfig, err = runtime.SyncProviderArgParse(viper.GetString(sourcesFlagName)) if err != nil { diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index aad32f36f..590911213 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -120,7 +120,7 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { return nil } -func (r *Runtime) newGRPC(config sync.ProviderConfig, logger *logger.Logger) *grpc.Sync { +func (r *Runtime) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync { return &grpc.Sync{ URI: config.URI, Logger: logger.WithFields( @@ -131,7 +131,7 @@ func (r *Runtime) newGRPC(config sync.ProviderConfig, logger *logger.Logger) *gr } } -func (r *Runtime) newHTTP(config sync.ProviderConfig, logger *logger.Logger) *httpSync.Sync { +func (r *Runtime) newHTTP(config sync.SourceConfig, logger *logger.Logger) *httpSync.Sync { return &httpSync.Sync{ URI: config.URI, Client: &http.Client{ @@ -162,7 +162,7 @@ func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, e ), nil } -func (r *Runtime) newFile(config sync.ProviderConfig, logger *logger.Logger) *file.Sync { +func (r *Runtime) newFile(config sync.SourceConfig, logger *logger.Logger) *file.Sync { return &file.Sync{ URI: config.URI, Logger: logger.WithFields( @@ -173,8 +173,8 @@ func (r *Runtime) newFile(config sync.ProviderConfig, logger *logger.Logger) *fi } } -func SyncProviderArgParse(syncProviders string) ([]sync.ProviderConfig, error) { - syncProvidersParsed := []sync.ProviderConfig{} +func SyncProviderArgParse(syncProviders string) ([]sync.SourceConfig, error) { + syncProvidersParsed := []sync.SourceConfig{} if err := json.Unmarshal([]byte(syncProviders), &syncProvidersParsed); err != nil { return syncProvidersParsed, fmt.Errorf("unable to parse sync providers: %w", err) } @@ -195,27 +195,27 @@ func SyncProviderArgParse(syncProviders string) ([]sync.ProviderConfig, error) { return syncProvidersParsed, nil } -func SyncProvidersFromURIs(uris []string) ([]sync.ProviderConfig, error) { - syncProvidersParsed := []sync.ProviderConfig{} +func SyncProvidersFromURIs(uris []string) ([]sync.SourceConfig, error) { + syncProvidersParsed := []sync.SourceConfig{} for _, uri := range uris { switch uriB := []byte(uri); { case regFile.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ + syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ URI: regFile.ReplaceAllString(uri, ""), Provider: syncProviderFile, }) case regCrd.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ + syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ URI: regCrd.ReplaceAllString(uri, ""), Provider: syncProviderKubernetes, }) case regURL.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ + syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ URI: uri, Provider: syncProviderHTTP, }) case regGRPC.Match(uriB), regGRPCSecure.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, sync.ProviderConfig{ + syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ URI: uri, Provider: syncProviderGrpc, }) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 0a18f4e7e..d6d242dfc 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -33,7 +33,7 @@ type Config struct { ServiceCertPath string ServiceKeyPath string - SyncProviders []sync.ProviderConfig + SyncProviders []sync.SourceConfig CORS []string } diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 8334d62a0..84da5ebd0 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -12,12 +12,12 @@ func TestSyncProviderArgParse(t *testing.T) { test := map[string]struct { in string expectErr bool - out []sync.ProviderConfig + out []sync.SourceConfig }{ "simple": { in: "[{\"uri\":\"config/samples/example_flags.json\",\"provider\":\"file\"}]", expectErr: false, - out: []sync.ProviderConfig{ + out: []sync.SourceConfig{ { URI: "config/samples/example_flags.json", Provider: "file", @@ -32,7 +32,7 @@ func TestSyncProviderArgParse(t *testing.T) { {"uri":"default/my-crd","provider":"kubernetes"} ]`, expectErr: false, - out: []sync.ProviderConfig{ + out: []sync.SourceConfig{ { URI: "config/samples/example_flags.json", Provider: "file", @@ -55,12 +55,12 @@ func TestSyncProviderArgParse(t *testing.T) { "empty": { in: `[]`, expectErr: false, - out: []sync.ProviderConfig{}, + out: []sync.SourceConfig{}, }, "parse-failure": { in: ``, expectErr: true, - out: []sync.ProviderConfig{}, + out: []sync.SourceConfig{}, }, } @@ -85,14 +85,14 @@ func TestSyncProvidersFromURIs(t *testing.T) { test := map[string]struct { in []string expectErr bool - out []sync.ProviderConfig + out []sync.SourceConfig }{ "simple": { in: []string{ "file:my-file.json", }, expectErr: false, - out: []sync.ProviderConfig{ + out: []sync.SourceConfig{ { URI: "my-file.json", Provider: "file", @@ -107,7 +107,7 @@ func TestSyncProvidersFromURIs(t *testing.T) { "core.openfeature.dev/default/my-crd", }, expectErr: false, - out: []sync.ProviderConfig{ + out: []sync.SourceConfig{ { URI: "my-file.json", Provider: "file", @@ -129,12 +129,12 @@ func TestSyncProvidersFromURIs(t *testing.T) { "empty": { in: []string{}, expectErr: false, - out: []sync.ProviderConfig{}, + out: []sync.SourceConfig{}, }, "parse-failure": { in: []string{"care.openfeature.dev/will/fail"}, expectErr: true, - out: []sync.ProviderConfig{}, + out: []sync.SourceConfig{}, }, } diff --git a/pkg/sync/isync.go b/pkg/sync/isync.go index 27f4ebb59..115011da2 100644 --- a/pkg/sync/isync.go +++ b/pkg/sync/isync.go @@ -58,7 +58,7 @@ type DataSync struct { Type } -type ProviderConfig struct { +type SourceConfig struct { URI string `json:"uri"` Provider string `json:"provider"` From 8a69ba89b54747284d74a716da9d0295273b948b Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 9 Mar 2023 12:34:26 -0800 Subject: [PATCH 24/24] fix retry con and tests to validate Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync.go | 6 ++- pkg/sync/grpc/grpc_sync_test.go | 90 +++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 2 deletions(-) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 685427f34..87daecb34 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -94,14 +94,16 @@ func (g *Sync) IsReady() bool { } func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { + // Initialize SyncFlags client. This fails if server connection establishment fails (ex:- grpc server offline) syncClient, err := g.client.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) if err != nil { return err } - // initial stream listening + // Initial stream listening. Error will be logged and continue and retry connection establishment err = g.handleFlagSync(syncClient, dataSync) - if err != nil { + if err == nil { + // This should not happen as handleFlagSync expects to return with an error return err } diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 71f361ce5..0a61488a4 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "golang.org/x/sync/errgroup" + "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" @@ -615,6 +617,94 @@ func Test_ConnectWithRetry(t *testing.T) { } } +// Test_SyncRetry validates sync and retry attempts +func Test_SyncRetry(t *testing.T) { + // Setup + target := "grpc://local" + bufListener := bufconn.Listen(1) + + expectType := sync.ALL + + // buffer based server. response ignored purposefully + bServer := bufferedServer{listener: bufListener, mockResponses: []serverPayload{ + { + flags: "{}", + state: v1.SyncState_SYNC_STATE_ALL, + }, + }} + + // generate a client connection backed by bufListener + clientConn, err := grpc.Dial(target, + grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return bufListener.DialContext(ctx) + }), + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Errorf("error initiating the connection: %s", err.Error()) + } + + // minimal sync provider + grpcSync := Sync{ + Logger: logger.NewLogger(nil, false), + client: syncv1grpc.NewFlagSyncServiceClient(clientConn), + } + + // channel for data sync + syncChan := make(chan sync.DataSync, 1) + + // Testing + + // Initial mock server - start mock server backed by a error group. Allow connection and disconnect with a timeout + tCtx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFunc() + + group, _ := errgroup.WithContext(tCtx) + group.Go(func() error { + serve(&bServer) + return nil + }) + + // Start Sync for grpc streaming + go func() { + err := grpcSync.Sync(context.Background(), syncChan) + if err != nil { + t.Errorf("sync start error: %s", err.Error()) + } + }() + + // Check for timeout (not ideal) or data sync (ideal) and cancel the context + select { + case <-tCtx.Done(): + t.Errorf("timeout waiting for conditions to fulfil") + break + case data := <-syncChan: + if data.Type != expectType { + t.Errorf("sync start error: %s", err.Error()) + } + } + + // cancel make error group to complete, making background mock server to exit + cancelFunc() + + // Follow up mock server start - start mock server after initial shutdown + tCtx, cancelFunc = context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFunc() + + // Restart the server + go serve(&bServer) + + // validate connection re-establishment + select { + case <-tCtx.Done(): + cancelFunc() + t.Error("timeout waiting for conditions to fulfil") + case rsp := <-syncChan: + if rsp.Type != expectType { + t.Errorf("expected response: %s, but got: %s", expectType, rsp.Type) + } + } +} + // Mock implementations type MockServiceClient struct {