Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d79dafe
wip
james-milligan Feb 28, 2023
ee8d823
docs
james-milligan Feb 28, 2023
35a84b5
docs
james-milligan Feb 28, 2023
e88e47b
linting
james-milligan Feb 28, 2023
484814f
docs
james-milligan Feb 28, 2023
cd3ad8c
Merge branch 'main' into sync-provider-2
james-milligan Feb 28, 2023
f8aef13
bug fix
james-milligan Feb 28, 2023
36c1223
Merge branch 'sync-provider-2' of https://github.com/james-milligan/f…
james-milligan Feb 28, 2023
7ee052d
Merge branch 'main' into sync-provider-2
james-milligan Feb 28, 2023
71f09e7
test coverage
james-milligan Mar 1, 2023
eb21c91
Merge branch 'sync-provider-2' of https://github.com/james-milligan/f…
james-milligan Mar 1, 2023
2d23994
Merge branch 'main' into sync-provider-2
james-milligan Mar 1, 2023
a0c844e
Apply suggestions from code review
james-milligan Mar 1, 2023
c4dee0e
name update
james-milligan Mar 1, 2023
5ecdb19
Merge branch 'main' into sync-provider-2
james-milligan Mar 1, 2023
c5864ac
merge conflicts
james-milligan Mar 2, 2023
73ce052
conflict fix
james-milligan Mar 2, 2023
5c3942e
conflict fix
james-milligan Mar 2, 2023
de6b6ba
bug fix
james-milligan Mar 2, 2023
2c6980d
Merge branch 'sync-provider-2' of https://github.com/james-milligan/f…
james-milligan Mar 2, 2023
86d392d
doc fix
james-milligan Mar 2, 2023
3e7c41a
deprecation warning
james-milligan Mar 2, 2023
1f7f7ae
removed config object from syncs
james-milligan Mar 7, 2023
a522f56
rename sync providers to sources
james-milligan Mar 7, 2023
5019af8
reintroduce deprecation of --sync-provider
james-milligan Mar 7, 2023
33644c3
rename func
james-milligan Mar 7, 2023
d0d77f1
conflict fix
james-milligan Mar 7, 2023
d7901ff
wip
james-milligan Feb 28, 2023
dbc886e
linting
james-milligan Feb 28, 2023
09c6d13
init grpc tls con
Kavindu-Dodan Mar 6, 2023
d17b08e
Merge branch 'main' into feat/grpc-tlc-con
Kavindu-Dodan Mar 9, 2023
6577362
resolve merge conflicts
Kavindu-Dodan Mar 9, 2023
7cacd8e
Merge branch 'main' into feat/grpc-tlc-con
Kavindu-Dodan Mar 9, 2023
a77be56
fix naming
Kavindu-Dodan Mar 9, 2023
58e1690
Merge branch 'main' into feat/grpc-tlc-con
Kavindu-Dodan Mar 9, 2023
8a69ba8
fix retry con and tests to validate
Kavindu-Dodan Mar 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ Config file expects the keys to have the exact naming as the flags.

Any URI passed to flagd via the `--uri` flag must follow one of the 4 following patterns to ensure that it is passed to the correct implementation:

| Sync | Pattern | Example |
|------------|------------------------------------|---------------------------------------|
| Sync | Pattern | Example |
|------------|---------------------------------------|---------------------------------------|
| Kubernetes | `core.openfeature.dev/namespace/name` | `core.openfeature.dev/default/my-crd` |
| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` |
| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` |
| Grpc | `grpc://flag-source-url` | `grpc://my-flags-server` |
| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` |
| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` |
| Grpc | `grpc(s)://flag-source-url` | `grpc://my-flags-server` |


### Customising sync providers
Expand All @@ -42,11 +42,12 @@ While a URI may be passed to flagd via the `--uri` flag, some implementations ma
The flag takes a string argument, which should be a JSON representation of an array of `SourceConfig` objects. Alternatively, these configurations should be passed to
flagd via config file, specified using the `--config` flag.

| Field | Type |
|------------|------------------------------------|
| uri | required `string` | |
| provider | required `string` (`file`, `kubernetes`, `http` or `grpc`) |
| bearerToken | optional `string` |
| Field | Type | Note |
|-------------|------------------------------------------------------------|----------------------------------------------------|
| uri | required `string` | |
| provider | required `string` (`file`, `kubernetes`, `http` or `grpc`) | |
| bearerToken | optional `string` | Used for http sync |
| certPath | optional `string` | Used for grpcs sync when TLS certificate is needed |

The `uri` field values do not need to follow the [URI patterns](#uri-patterns), the provider type is instead derived from the provider field. If the prefix is supplied, it will be removed on startup without error.

Expand All @@ -68,4 +69,7 @@ sources:
provider: kubernetes
- uri: grpc://my-flag-source:8080
provider: grpc
- uri: grpcs://my-flag-source:8080
provider: grpc
certPath: /certs/ca.cert
```
15 changes: 9 additions & 6 deletions pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@ const (
)

var (
regCrd *regexp.Regexp
regURL *regexp.Regexp
regGRPC *regexp.Regexp
regFile *regexp.Regexp
regCrd *regexp.Regexp
regURL *regexp.Regexp
regGRPC *regexp.Regexp
regGRPCSecure *regexp.Regexp
regFile *regexp.Regexp
)

func init() {
regCrd = regexp.MustCompile("^core.openfeature.dev/")
regURL = regexp.MustCompile("^https?://")
regGRPC = regexp.MustCompile("^" + grpc.Prefix)
regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure)
regFile = regexp.MustCompile("^file:")
}

Expand Down Expand Up @@ -120,11 +122,12 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {

func (r *Runtime) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync {
return &grpc.Sync{
Target: grpc.URLToGRPCTarget(config.URI),
URI: config.URI,
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
CertPath: config.CertPath,
}
}

Expand Down Expand Up @@ -211,7 +214,7 @@ func SyncProvidersFromURIs(uris []string) ([]sync.SourceConfig, error) {
URI: uri,
Provider: syncProviderHTTP,
})
case regGRPC.Match(uriB):
case regGRPC.Match(uriB), regGRPCSecure.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{
URI: uri,
Provider: syncProviderGrpc,
Expand Down
141 changes: 95 additions & 46 deletions pkg/sync/grpc/grpc_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package grpc

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"math"
"os"
"strings"
msync "sync"
"time"

"google.golang.org/grpc/credentials"

"buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1"

Expand All @@ -18,47 +23,55 @@ import (
)

const (
// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate
// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints.
Prefix = "grpc://"
// Prefix for GRPC URL inputs. GRPC does not define a standard prefix. This prefix helps to differentiate remote
// URLs for REST APIs (i.e - HTTP) from GRPC endpoints.
Prefix = "grpc://"
PrefixSecure = "grpcs://"

// Connection retry constants
// Back off period is calculated with backOffBase ^ #retry-iteration. However, when #retry-iteration count reach
// backOffLimit, retry delay fallback to constantBackOffDelay
backOffLimit = 3
backOffBase = 4
constantBackOffDelay = 60

tlsVersion = tls.VersionTLS12
)

var once msync.Once

type Sync struct {
Target string
URI string
ProviderID string
CertPath string
Logger *logger.Logger
Mux *msync.RWMutex

syncClient syncv1grpc.FlagSyncService_SyncFlagsClient
client syncv1grpc.FlagSyncServiceClient
options []grpc.DialOption
ready bool
client syncv1grpc.FlagSyncServiceClient
ready bool
}

func (g *Sync) connectClient(ctx context.Context) error {
// initial dial and connection. Failure here must result in a startup failure
dial, err := grpc.DialContext(ctx, g.Target, g.options...)
func (g *Sync) Init(ctx context.Context) error {
tCredentials, err := buildTransportCredentials(g.URI, g.CertPath)
if err != nil {
g.Logger.Error(fmt.Sprintf("error building transport credentials: %s", err.Error()))
return err
}

g.client = syncv1grpc.NewFlagSyncServiceClient(dial)
target, ok := sourceToGRPCTarget(g.URI)
if !ok {
return fmt.Errorf("invalid grpc source: %s", g.URI)
}

syncClient, err := g.client.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID})
// Derive reusable client connection
rpcCon, err := grpc.DialContext(ctx, target, grpc.WithTransportCredentials(tCredentials))
if err != nil {
g.Logger.Error(fmt.Sprintf("error calling streaming operation: %s", err.Error()))
g.Logger.Error(fmt.Sprintf("error initiating grpc client connection: %s", err.Error()))
return err
}
g.syncClient = syncClient

// Setup service client
g.client = syncv1grpc.NewFlagSyncServiceClient(rpcCon)

return nil
}

Expand All @@ -70,30 +83,28 @@ func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error
}
dataSync <- sync.DataSync{
FlagData: res.GetFlagConfiguration(),
Source: g.Target,
Source: g.URI,
Type: sync.ALL,
}
return nil
}

func (g *Sync) Init(ctx context.Context) error {
g.options = []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}

// initial dial and connection. Failure here must result in a startup failure
return g.connectClient(ctx)
}

func (g *Sync) IsReady() bool {
return g.ready
}

func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
// initial stream listening
err := g.handleFlagSync(g.syncClient, dataSync)
// Initialize SyncFlags client. This fails if server connection establishment fails (ex:- grpc server offline)
syncClient, err := g.client.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID})
if err != nil {
return err
}

// Initial stream listening. Error will be logged and continue and retry connection establishment
err = g.handleFlagSync(syncClient, dataSync)
if err == nil {
return nil
// This should not happen as handleFlagSync expects to return with an error
Comment thread
toddbaert marked this conversation as resolved.
return err
}

g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error()))
Expand Down Expand Up @@ -141,20 +152,15 @@ func (g *Sync) connectWithRetry(
return nil, false
}

g.Logger.Warn(fmt.Sprintf("connection re-establishment attempt in-progress for grpc target: %s", g.Target))

if err := g.connectClient(ctx); err != nil {
g.Logger.Debug(fmt.Sprintf("error dialing target: %s", err.Error()))
continue
}
g.Logger.Warn(fmt.Sprintf("connection re-establishment attempt in-progress for grpc target: %s", g.URI))

syncClient, err := g.client.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID})
if err != nil {
g.Logger.Debug(fmt.Sprintf("error opening service client: %s", err.Error()))
continue
}

g.Logger.Info(fmt.Sprintf("connection re-established with grpc target: %s", g.Target))
g.Logger.Info(fmt.Sprintf("connection re-established with grpc target: %s", g.URI))
return syncClient, true
}
}
Expand All @@ -176,31 +182,31 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
case v1.SyncState_SYNC_STATE_ALL:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.URI,
Type: sync.ALL,
}

g.Logger.Debug("received full configuration payload")
case v1.SyncState_SYNC_STATE_ADD:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.URI,
Type: sync.ADD,
}

g.Logger.Debug("received an add payload")
case v1.SyncState_SYNC_STATE_UPDATE:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.URI,
Type: sync.UPDATE,
}

g.Logger.Debug("received an update payload")
case v1.SyncState_SYNC_STATE_DELETE:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.Target,
Source: g.URI,
Type: sync.DELETE,
}

Expand All @@ -213,14 +219,57 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
}
}

// URLToGRPCTarget is a helper to derive GRPC target from a provided URL
// buildTransportCredentials is a helper to build grpc credentials.TransportCredentials based on source and cert path
func buildTransportCredentials(source string, certPath string) (credentials.TransportCredentials, error) {
if strings.Contains(source, Prefix) {
return insecure.NewCredentials(), nil
}

if !strings.Contains(source, PrefixSecure) {
return nil, fmt.Errorf("invalid source. grpc source must contain prefix %s or %s", Prefix, PrefixSecure)
}

if certPath == "" {
// Rely on CA certs provided from system
return credentials.NewTLS(&tls.Config{MinVersion: tlsVersion}), nil
}

// Rely on provided certificate
certBytes, err := os.ReadFile(certPath)
if err != nil {
return nil, err
}

cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(certBytes) {
return nil, fmt.Errorf("invalid certificate provided at path: %s", certPath)
}

return credentials.NewTLS(&tls.Config{
MinVersion: tlsVersion,
RootCAs: cp,
}), nil
}

// sourceToGRPCTarget is a helper to derive GRPC target from a provided URL
// For example, function returns the target localhost:9090 for the input grpc://localhost:9090
func URLToGRPCTarget(url string) string {
index := strings.Split(url, Prefix)
func sourceToGRPCTarget(url string) (string, bool) {
var separator string

switch {
case strings.Contains(url, Prefix):
separator = Prefix
case strings.Contains(url, PrefixSecure):
separator = PrefixSecure
default:
return "", false
}

index := strings.Split(url, separator)

if len(index) == 2 {
return index[1]
if len(index) == 2 && len(index[1]) != 0 {
return index[1], true
}

return index[0]
return "", false
}
Loading