Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
bearerTokenFlagName = "bearer-token"
corsFlagName = "cors-origin"
evaluatorFlagName = "evaluator"
grpcCertPathFlagName = "grpc-sync-cert-path"
logFormatFlagName = "log-format"
metricsPortFlagName = "metrics-port"
portFlagName = "port"
Expand Down Expand Up @@ -57,10 +58,13 @@ func init() {
syncProviderFlagName, "y", "", "DEPRECATED: Set a sync provider e.g. filepath or remote",
)
flags.StringP(logFormatFlagName, "z", "console", "Set the logging format, e.g. console or json ")
flags.StringP(grpcCertPathFlagName, "g", "", "Path to root certificate to be used by TLS enabled grpc"+
" sync (grpcs://). If TLS is used and this configuration is ignored, TLS uses the host's root CA set.")

_ = viper.BindPFlag(bearerTokenFlagName, flags.Lookup(bearerTokenFlagName))
_ = viper.BindPFlag(corsFlagName, flags.Lookup(corsFlagName))
_ = viper.BindPFlag(evaluatorFlagName, flags.Lookup(evaluatorFlagName))
_ = viper.BindPFlag(grpcCertPathFlagName, flags.Lookup(grpcCertPathFlagName))
_ = viper.BindPFlag(logFormatFlagName, flags.Lookup(logFormatFlagName))
_ = viper.BindPFlag(metricsPortFlagName, flags.Lookup(metricsPortFlagName))
_ = viper.BindPFlag(portFlagName, flags.Lookup(portFlagName))
Expand Down Expand Up @@ -105,6 +109,7 @@ var startCmd = &cobra.Command{
// Build Runtime -----------------------------------------------------------
rt, err := runtime.FromConfig(logger, runtime.Config{
CORS: viper.GetStringSlice(corsFlagName),
GrpcCertPath: viper.GetString(grpcCertPathFlagName),
MetricsPort: viper.GetInt32(metricsPortFlagName),
ProviderArgs: viper.GetStringMapString(providerArgsFlagName),
ServiceCertPath: viper.GetString(serverCertPathFlagName),
Expand Down
10 changes: 5 additions & 5 deletions docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ Config file expects the keys to have the exact naming as the flags.

Any URI passed to flagd via the `--uri` flag must follow one of the 4 following patterns to ensure that it is passed to the correct implementation:

| Sync | Pattern | Example |
|------------|------------------------------------|---------------------------------------|
| Sync | Pattern | Example |
|------------|---------------------------------------|---------------------------------------|
| Kubernetes | `core.openfeature.dev/namespace/name` | `core.openfeature.dev/default/my-crd` |
| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` |
| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` |
| Grpc | `grpc://flag-source-url` | `grpc://my-flags-server` |
| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` |
| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` |
| Grpc | `grpc(s)://flag-source-url` | `grpc://my-flags-server` |



Expand Down
1 change: 1 addition & 0 deletions docs/configuration/flagd_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ flagd start [flags]
-b, --bearer-token string Set a bearer token to use for remote sync
-C, --cors-origin strings CORS allowed origins, * will allow all origins
-e, --evaluator string DEPRECATED: Set an evaluator e.g. json, yaml/yml.Please note that yaml/yml and json evaluations work the same (yaml/yml files are converted to json internally) (default "json")
-g, --grpc-sync-cert-path string Path to root certificate to be used by TLS enabled grpc sync (grpcs://). If TLS is used and this configuration is ignored, TLS uses the host's root CA set.
-h, --help help for start
-z, --log-format string Set the logging format, e.g. console or json (default "console")
-m, --metrics-port int32 Port to serve metrics on (default 8014)
Expand Down
19 changes: 11 additions & 8 deletions pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ import (
)

var (
regCrd *regexp.Regexp
regURL *regexp.Regexp
regGRPC *regexp.Regexp
regFile *regexp.Regexp
regCrd *regexp.Regexp
regGRPC *regexp.Regexp
regGRPCS *regexp.Regexp
regFile *regexp.Regexp
regURL *regexp.Regexp
)

func init() {
regCrd = regexp.MustCompile("^core.openfeature.dev/")
regURL = regexp.MustCompile("^https?://")
regGRPC = regexp.MustCompile("^" + grpc.Prefix)
regGRPCS = regexp.MustCompile("^" + grpc.PrefixSecure)
regFile = regexp.MustCompile("^file:")
}

Expand Down Expand Up @@ -101,17 +103,18 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
Cron: cron.New(),
})
rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri))
case regGRPC.Match(uriB):
case regGRPC.Match(uriB), regGRPCS.Match(uriB):
r.SyncImpl = append(r.SyncImpl, &grpc.Sync{
Target: grpc.URLToGRPCTarget(uri),
CertPath: r.config.GrpcCertPath,
Source: uri,
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
})
default:
return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+
" or 'core.openfeature.dev'", uri)
return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc(s)://'"+
", or 'core.openfeature.dev'", uri)
}
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Config struct {
RemoteSyncType string
SyncBearerToken string

GrpcCertPath string

CORS []string
}

Expand Down
139 changes: 103 additions & 36 deletions pkg/sync/grpc/grpc_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package grpc

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"math"
"os"
"strings"
"time"

"google.golang.org/grpc/credentials"

"google.golang.org/grpc/credentials/insecure"

"buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
Expand All @@ -18,50 +23,77 @@ import (
)

const (
// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate
// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints.
Prefix = "grpc://"
// Prefix for GRPC URL inputs. GRPC does not define a standard prefix. This prefix helps to differentiate remote
// URLs for REST APIs (i.e - HTTP) from GRPC endpoints.
Prefix = "grpc://"
PrefixSecure = "grpcs://"

// Connection retry constants
// Back off period is calculated with backOffBase ^ #retry-iteration. However, when #retry-iteration count reach
// backOffLimit, retry delay fallback to constantBackOffDelay
backOffLimit = 3
backOffBase = 4
constantBackOffDelay = 60

tlsVersion = tls.VersionTLS12
)

type Sync struct {
Target string
ProviderID string
CertPath string
Logger *logger.Logger
ProviderID string
Source string

// rpcCon is a reusable grpc client connection. Lazy initialization by waiting for runtime to call Sync
rpcCon *grpc.ClientConn
}

// Sync initialize internals and start internal sync implementation
func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
options := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
tCredentials, err := buildTransportCredentials(g.Source, g.CertPath)
if err != nil {
g.Logger.Error(fmt.Sprintf("error building transport credentials: %s", err.Error()))
Comment thread
Kavindu-Dodan marked this conversation as resolved.
return err
}

target, ok := sourceToGRPCTarget(g.Source)
if !ok {
return fmt.Errorf("invalid grpc source: %s", g.Source)
}

// initial dial and connection. Failure here must result in a startup failure
dial, err := grpc.DialContext(ctx, g.Target, options...)
// Derive reusable client connection
g.rpcCon, err = grpc.DialContext(ctx, target, grpc.WithTransportCredentials(tCredentials))
if err != nil {
g.Logger.Error(fmt.Sprintf("error establishing grpc connection: %s", err.Error()))
g.Logger.Error(fmt.Sprintf("error initiating grpc client connection: %s", err.Error()))
Comment thread
Kavindu-Dodan marked this conversation as resolved.
return err
}

serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial)
// Cleanup when exiting the sync
defer func(rpcCon *grpc.ClientConn) {
err := rpcCon.Close()
if err != nil {
g.Logger.Warn(fmt.Sprintf("error while closing the client connection: %s", err.Error()))
Comment thread
Kavindu-Dodan marked this conversation as resolved.
}
}(g.rpcCon)
return g.syncInternal(ctx, dataSync)
}

// syncInternal connects to grpc stream and push updates through sync channel. It attempts to reconnect if connection
// fails. However, initial connection must be successful. This makes sure provided configurations are valid.
func (g *Sync) syncInternal(ctx context.Context, dataSync chan<- sync.DataSync) error {
serviceClient := syncv1grpc.NewFlagSyncServiceClient(g.rpcCon)
syncClient, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID})
if err != nil {
g.Logger.Error(fmt.Sprintf("error calling streaming operation: %s", err.Error()))
g.Logger.Error(fmt.Sprintf("error initializing the client: %s", err.Error()))
Comment thread
Kavindu-Dodan marked this conversation as resolved.
return err
}

// initial stream listening
err = g.handleFlagSync(syncClient, dataSync)
g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error()))

// retry connection establishment
for {
syncClient, ok := g.connectWithRetry(ctx, options...)
syncClient, ok := g.connectWithRetry(ctx)
if !ok {
// We shall exit
return nil
Expand All @@ -79,9 +111,7 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
// a successful connection is established. Caller must not expect an error. Hence, errors are handled, logged
// internally. However, if the provided context is done, method exit with a non-ok state which must be verified by the
// caller
func (g *Sync) connectWithRetry(
ctx context.Context, options ...grpc.DialOption,
) (syncv1grpc.FlagSyncService_SyncFlagsClient, bool) {
func (g *Sync) connectWithRetry(ctx context.Context) (syncv1grpc.FlagSyncService_SyncFlagsClient, bool) {
var iteration int

for {
Expand All @@ -102,22 +132,16 @@ func (g *Sync) connectWithRetry(
return nil, false
}

g.Logger.Warn(fmt.Sprintf("connection re-establishment attempt in-progress for grpc target: %s", g.Target))

dial, err := grpc.DialContext(ctx, g.Target, options...)
if err != nil {
g.Logger.Debug(fmt.Sprintf("error dialing target: %s", err.Error()))
continue
}
g.Logger.Warn(fmt.Sprintf("connection re-establishment attempt in-progress for grpc source: %s", g.Source))

serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial)
serviceClient := syncv1grpc.NewFlagSyncServiceClient(g.rpcCon)
syncClient, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID})
if err != nil {
g.Logger.Debug(fmt.Sprintf("error opening service client: %s", err.Error()))
continue
}

g.Logger.Info(fmt.Sprintf("connection re-established with grpc target: %s", g.Target))
g.Logger.Info(fmt.Sprintf("connection re-established with grpc source: %s", g.Source))
return syncClient, true
}
}
Expand All @@ -134,31 +158,31 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
case v1.SyncState_SYNC_STATE_ALL:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.Source,
Type: sync.ALL,
}

g.Logger.Debug("received full configuration payload")
case v1.SyncState_SYNC_STATE_ADD:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.Source,
Type: sync.ADD,
}

g.Logger.Debug("received an add payload")
case v1.SyncState_SYNC_STATE_UPDATE:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.Source,
Type: sync.UPDATE,
}

g.Logger.Debug("received an update payload")
case v1.SyncState_SYNC_STATE_DELETE:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.Source,
Type: sync.DELETE,
}

Expand All @@ -171,14 +195,57 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
}
}

// URLToGRPCTarget is a helper to derive GRPC target from a provided URL
// buildTransportCredentials is a helper to build grpc credentials.TransportCredentials based on source and cert path
func buildTransportCredentials(source string, certPath string) (credentials.TransportCredentials, error) {
if strings.Contains(source, Prefix) {
return insecure.NewCredentials(), nil
}

if !strings.Contains(source, PrefixSecure) {
return nil, fmt.Errorf("invalid source. grpc source must contain prefix %s or %s", Prefix, PrefixSecure)
}

if certPath == "" {
// Rely on CA certs provided from system
return credentials.NewTLS(&tls.Config{MinVersion: tlsVersion}), nil
}

// Rely on provided certificate
certBytes, err := os.ReadFile(certPath)
if err != nil {
return nil, err
}

cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(certBytes) {
return nil, fmt.Errorf("invalid certificate provided at path: %s", certPath)
}

return credentials.NewTLS(&tls.Config{
MinVersion: tlsVersion,
RootCAs: cp,
}), nil
}

// sourceToGRPCTarget is a helper to derive GRPC target from a provided URL
// For example, function returns the target localhost:9090 for the input grpc://localhost:9090
func URLToGRPCTarget(url string) string {
index := strings.Split(url, Prefix)
func sourceToGRPCTarget(url string) (string, bool) {
var separator string

switch {
case strings.Contains(url, Prefix):
separator = Prefix
case strings.Contains(url, PrefixSecure):
separator = PrefixSecure
default:
return "", false
}

index := strings.Split(url, separator)

if len(index) == 2 {
return index[1]
if len(index) == 2 && len(index[1]) != 0 {
return index[1], true
}

return index[0]
return "", false
}
Loading