diff --git a/pkg/eval/ievaluator.go b/pkg/eval/ievaluator.go index e6fb652d1..071d9fadc 100644 --- a/pkg/eval/ievaluator.go +++ b/pkg/eval/ievaluator.go @@ -1,6 +1,7 @@ package eval import ( + "github.com/open-feature/flagd/pkg/sync" "google.golang.org/protobuf/types/known/structpb" ) @@ -40,7 +41,7 @@ do parsing and validation of the flag state and evaluate flags in response to ha */ type IEvaluator interface { GetState() (string, error) - SetState(source string, state string) (map[string]interface{}, error) + SetState(payload sync.DataSync) (map[string]interface{}, error) ResolveBooleanValue( reqID string, diff --git a/pkg/eval/json_evaluator.go b/pkg/eval/json_evaluator.go index f908832db..ed83c2cd3 100644 --- a/pkg/eval/json_evaluator.go +++ b/pkg/eval/json_evaluator.go @@ -6,8 +6,11 @@ import ( "errors" "fmt" "regexp" + "strconv" "strings" + "github.com/open-feature/flagd/pkg/sync" + "github.com/diegoholiveira/jsonlogic/v3" "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/model" @@ -42,6 +45,9 @@ func NewJSONEvaluator(logger *logger.Logger) *JSONEvaluator { zap.String("component", "evaluator"), zap.String("evaluator", "json"), ), + state: Flags{ + Flags: map[string]Flag{}, + }, } jsonlogic.AddOperator("fractionalEvaluation", ev.fractionalEvaluation) return &ev @@ -55,36 +61,25 @@ func (je *JSONEvaluator) GetState() (string, error) { return string(data), nil } -func (je *JSONEvaluator) SetState(source string, state string) (map[string]interface{}, error) { - schemaLoader := gojsonschema.NewStringLoader(schema.FlagdDefinitions) - flagStringLoader := gojsonschema.NewStringLoader(state) - result, err := gojsonschema.Validate(schemaLoader, flagStringLoader) - - if err != nil { - return nil, err - } else if !result.Valid() { - err := errors.New("invalid JSON file") - return nil, err - } - - state, err = je.transposeEvaluators(state) - if err != nil { - return nil, fmt.Errorf("transpose evaluators: %w", err) - } - +func (je *JSONEvaluator) SetState(payload sync.DataSync) (map[string]interface{}, error) { var newFlags Flags - err = json.Unmarshal([]byte(state), &newFlags) + err := je.configToFlags(payload.FlagData, &newFlags) if err != nil { - return nil, fmt.Errorf("unmarshal new state: %w", err) - } - if err := validateDefaultVariants(newFlags); err != nil { return nil, err } - s, notifications := je.state.Merge(je.Logger, source, newFlags) - je.state = s - - return notifications, nil + switch payload.Type { + case sync.ALL: + return je.state.Merge(je.Logger, payload.Source, newFlags), nil + case sync.ADD: + return je.state.Add(je.Logger, payload.Source, newFlags), nil + case sync.UPDATE: + return je.state.Update(je.Logger, payload.Source, newFlags), nil + case sync.DELETE: + return je.state.Delete(je.Logger, payload.Source, newFlags), nil + default: + return nil, fmt.Errorf("unsupported sync type: %d", payload.Type) + } } func resolve[T constraints](reqID string, key string, context *structpb.Struct, @@ -274,8 +269,36 @@ func (je *JSONEvaluator) evaluateVariant( return je.state.Flags[flagKey].DefaultVariant, reason, nil } +// configToFlags convert string configurations to flags and store them to pointer newFlags +func (je *JSONEvaluator) configToFlags(config string, newFlags *Flags) error { + schemaLoader := gojsonschema.NewStringLoader(schema.FlagdDefinitions) + flagStringLoader := gojsonschema.NewStringLoader(config) + + result, err := gojsonschema.Validate(schemaLoader, flagStringLoader) + if err != nil { + return err + } else if !result.Valid() { + return fmt.Errorf("JSON schema validation failed: %s", buildErrorString(result.Errors())) + } + + transposedConfig, err := je.transposeEvaluators(config) + if err != nil { + return fmt.Errorf("transposing evaluators: %w", err) + } + + err = json.Unmarshal([]byte(transposedConfig), &newFlags) + if err != nil { + return fmt.Errorf("unmarshalling provided configurations: %w", err) + } + if err := validateDefaultVariants(newFlags); err != nil { + return err + } + + return nil +} + // validateDefaultVariants returns an error if any of the default variants aren't valid -func validateDefaultVariants(flags Flags) error { +func validateDefaultVariants(flags *Flags) error { for name, flag := range flags.Flags { if _, ok := flag.Variants[flag.DefaultVariant]; !ok { return fmt.Errorf( @@ -315,3 +338,18 @@ func (je *JSONEvaluator) transposeEvaluators(state string) (string, error) { return state, nil } + +// buildErrorString efficiently converts json schema errors to a formatted string, usable for logging +func buildErrorString(errors []gojsonschema.ResultError) string { + var builder strings.Builder + + for i, err := range errors { + builder.WriteByte(' ') + builder.WriteString(strconv.Itoa(i + 1)) + builder.WriteByte(':') + builder.WriteString(err.String()) + builder.WriteByte(' ') + } + + return builder.String() +} diff --git a/pkg/eval/json_evaluator_model.go b/pkg/eval/json_evaluator_model.go index 1ca4e3093..67751d3c8 100644 --- a/pkg/eval/json_evaluator_model.go +++ b/pkg/eval/json_evaluator_model.go @@ -8,21 +8,114 @@ import ( "github.com/open-feature/flagd/pkg/logger" ) -type Flags struct { - Flags map[string]Flag `json:"flags"` +type Flag struct { + State string `json:"state"` + DefaultVariant string `json:"defaultVariant"` + Variants map[string]any `json:"variants"` + Targeting json.RawMessage `json:"targeting,omitempty"` + Source string `json:"source"` } type Evaluators struct { Evaluators map[string]json.RawMessage `json:"$evaluators"` } -func (f Flags) Merge(logger *logger.Logger, source string, ff Flags) (Flags, map[string]interface{}) { +type Flags struct { + Flags map[string]Flag `json:"flags"` +} + +// Add new flags from source. The implementation is not thread safe +func (f Flags) Add(logger *logger.Logger, source string, ff Flags) map[string]interface{} { + notifications := map[string]interface{}{} + + for k, newFlag := range ff.Flags { + if storedFlag, ok := f.Flags[k]; ok && storedFlag.Source != source { + logger.Warn(fmt.Sprintf( + "flag with key %s from source %s already exist, overriding this with flag from source %s", + k, + storedFlag.Source, + source, + )) + } + + notifications[k] = map[string]interface{}{ + "type": string(NotificationCreate), + "source": source, + } + + // Store the new version of the flag + newFlag.Source = source + f.Flags[k] = newFlag + } + + return notifications +} + +// Update existing flags from source. The implementation is not thread safe +func (f Flags) Update(logger *logger.Logger, source string, ff Flags) map[string]interface{} { notifications := map[string]interface{}{} - result := Flags{Flags: make(map[string]Flag)} + + for k, flag := range ff.Flags { + if storedFlag, ok := f.Flags[k]; !ok { + logger.Warn( + fmt.Sprintf("failed to update the flag, flag with key %s from source %s does not exisit.", + k, + source)) + + continue + } else if storedFlag.Source != source { + logger.Warn(fmt.Sprintf( + "flag with key %s from source %s already exist, overriding this with flag from source %s", + k, + storedFlag.Source, + source, + )) + } + + notifications[k] = map[string]interface{}{ + "type": string(NotificationUpdate), + "source": source, + } + + flag.Source = source + f.Flags[k] = flag + } + + return notifications +} + +// Delete matching flags from source. The implementation is not thread safe +func (f Flags) Delete(logger *logger.Logger, source string, ff Flags) map[string]interface{} { + notifications := map[string]interface{}{} + + for k := range ff.Flags { + if _, ok := f.Flags[k]; ok { + notifications[k] = map[string]interface{}{ + "type": string(NotificationDelete), + "source": source, + } + + delete(f.Flags, k) + } else { + logger.Warn( + fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exisit.", + k, + source)) + } + } + + return notifications +} + +// Merge provided flags from source with currently stored flags. The implementation is not thread safe +func (f Flags) Merge(logger *logger.Logger, source string, ff Flags) map[string]interface{} { + notifications := map[string]interface{}{} + for k, v := range f.Flags { if v.Source == source { if _, ok := ff.Flags[k]; !ok { // flag has been deleted + delete(f.Flags, k) notifications[k] = map[string]interface{}{ "type": string(NotificationDelete), "source": source, @@ -30,23 +123,24 @@ func (f Flags) Merge(logger *logger.Logger, source string, ff Flags) (Flags, map continue } } - result.Flags[k] = v } - for k, v := range ff.Flags { - v.Source = source - val, ok := result.Flags[k] + + for k, newFlag := range ff.Flags { + newFlag.Source = source + + storedFlag, ok := f.Flags[k] if !ok { notifications[k] = map[string]interface{}{ "type": string(NotificationCreate), "source": source, } - } else if !reflect.DeepEqual(val, v) { - if val.Source != source { + } else if !reflect.DeepEqual(storedFlag, newFlag) { + if storedFlag.Source != source { logger.Warn( fmt.Sprintf( "key value: %s is duplicated across multiple sources this can lead to unexpected behavior: %s, %s", k, - val.Source, + storedFlag.Source, source, ), ) @@ -56,15 +150,10 @@ func (f Flags) Merge(logger *logger.Logger, source string, ff Flags) (Flags, map "source": source, } } - result.Flags[k] = v + + // Store the new version of the flag + f.Flags[k] = newFlag } - return result, notifications -} -type Flag struct { - State string `json:"state"` - DefaultVariant string `json:"defaultVariant"` - Variants map[string]any `json:"variants"` - Targeting json.RawMessage `json:"targeting,omitempty"` - Source string `json:"source"` + return notifications } diff --git a/pkg/eval/json_evaluator_model_test.go b/pkg/eval/json_evaluator_model_test.go new file mode 100644 index 000000000..b4f7dc77a --- /dev/null +++ b/pkg/eval/json_evaluator_model_test.go @@ -0,0 +1,411 @@ +package eval + +import ( + "testing" + + "github.com/open-feature/flagd/pkg/logger" + "github.com/stretchr/testify/require" +) + +func TestMergeFlags(t *testing.T) { + t.Parallel() + tests := []struct { + name string + current Flags + new Flags + newSource string + want Flags + wantNotifs map[string]interface{} + }{ + { + name: "both nil", + current: Flags{Flags: nil}, + new: Flags{Flags: nil}, + want: Flags{Flags: map[string]Flag{}}, + wantNotifs: map[string]interface{}{}, + }, + { + name: "both empty flags", + current: Flags{Flags: map[string]Flag{}}, + new: Flags{Flags: map[string]Flag{}}, + want: Flags{Flags: map[string]Flag{}}, + wantNotifs: map[string]interface{}{}, + }, + { + name: "empty current", + current: Flags{Flags: nil}, + new: Flags{Flags: map[string]Flag{}}, + want: Flags{Flags: map[string]Flag{}}, + wantNotifs: map[string]interface{}{}, + }, + { + name: "empty new", + current: Flags{Flags: map[string]Flag{}}, + new: Flags{Flags: nil}, + want: Flags{Flags: map[string]Flag{}}, + wantNotifs: map[string]interface{}{}, + }, + { + name: "extra fields on each", + current: Flags{Flags: map[string]Flag{ + "waka": { + DefaultVariant: "off", + Source: "1", + }, + }}, + new: Flags{Flags: map[string]Flag{ + "paka": { + DefaultVariant: "on", + }, + }}, + newSource: "2", + want: Flags{Flags: map[string]Flag{ + "waka": { + DefaultVariant: "off", + Source: "1", + }, + "paka": { + DefaultVariant: "on", + Source: "2", + }, + }}, + wantNotifs: map[string]interface{}{ + "paka": map[string]interface{}{"type": "write", "source": "2"}, + }, + }, + { + name: "override", + current: Flags{Flags: map[string]Flag{ + "waka": {DefaultVariant: "off"}, + }}, + new: Flags{Flags: map[string]Flag{ + "waka": {DefaultVariant: "on"}, + "paka": {DefaultVariant: "on"}, + }}, + want: Flags{Flags: map[string]Flag{ + "waka": {DefaultVariant: "on"}, + "paka": {DefaultVariant: "on"}, + }}, + wantNotifs: map[string]interface{}{ + "waka": map[string]interface{}{"type": "update", "source": ""}, + "paka": map[string]interface{}{"type": "write", "source": ""}, + }, + }, + { + name: "identical", + current: Flags{Flags: map[string]Flag{ + "hello": {DefaultVariant: "off"}, + }}, + new: Flags{Flags: map[string]Flag{ + "hello": {DefaultVariant: "off"}, + }}, + want: Flags{Flags: map[string]Flag{ + "hello": {DefaultVariant: "off"}, + }}, + wantNotifs: map[string]interface{}{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + gotNotifs := tt.current.Merge(logger.NewLogger(nil, false), tt.newSource, tt.new) + require.Equal(t, tt.want, tt.want) + require.Equal(t, tt.wantNotifs, gotNotifs) + }) + } +} + +func TestFlags_Add(t *testing.T) { + mockLogger := logger.NewLogger(nil, false) + mockSource := "source" + mockOverrideSource := "source-2" + + type request struct { + source string + flags Flags + } + + tests := []struct { + name string + storedState Flags + addRequest request + expectedState Flags + expectedNotificationKeys []string + }{ + { + name: "Add success", + storedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource}, + }, + }, + addRequest: request{ + source: mockSource, + flags: Flags{ + Flags: map[string]Flag{ + "B": {Source: mockSource}, + }, + }, + }, + expectedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource}, + "B": {Source: mockSource}, + }, + }, + expectedNotificationKeys: []string{"B"}, + }, + { + name: "Add multiple success", + storedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource}, + }, + }, + addRequest: request{ + source: mockSource, + flags: Flags{ + Flags: map[string]Flag{ + "B": {Source: mockSource}, + "C": {Source: mockSource}, + }, + }, + }, + expectedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource}, + "B": {Source: mockSource}, + "C": {Source: mockSource}, + }, + }, + expectedNotificationKeys: []string{"B", "C"}, + }, + { + name: "Add success - conflict and override", + storedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource}, + }, + }, + addRequest: request{ + source: mockOverrideSource, + flags: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockOverrideSource}, + }, + }, + }, + expectedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockOverrideSource}, + }, + }, + expectedNotificationKeys: []string{"A"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + messages := tt.storedState.Add(mockLogger, tt.addRequest.source, tt.addRequest.flags) + + require.Equal(t, tt.storedState, tt.expectedState) + + for k := range messages { + require.Containsf(t, tt.expectedNotificationKeys, k, + "Message key %s not present in the expected key list", k) + } + }) + } +} + +func TestFlags_Update(t *testing.T) { + mockLogger := logger.NewLogger(nil, false) + mockSource := "source" + mockOverrideSource := "source-2" + + type request struct { + source string + flags Flags + } + + tests := []struct { + name string + storedState Flags + UpdateRequest request + expectedState Flags + expectedNotificationKeys []string + }{ + { + name: "Update success", + storedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource, DefaultVariant: "True"}, + }, + }, + UpdateRequest: request{ + source: mockSource, + flags: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource, DefaultVariant: "False"}, + }, + }, + }, + expectedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource, DefaultVariant: "False"}, + }, + }, + expectedNotificationKeys: []string{"A"}, + }, + { + name: "Update multiple success", + storedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource, DefaultVariant: "True"}, + "B": {Source: mockSource, DefaultVariant: "True"}, + }, + }, + UpdateRequest: request{ + source: mockSource, + flags: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource, DefaultVariant: "False"}, + "B": {Source: mockSource, DefaultVariant: "False"}, + }, + }, + }, + expectedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource, DefaultVariant: "False"}, + "B": {Source: mockSource, DefaultVariant: "False"}, + }, + }, + expectedNotificationKeys: []string{"A", "B"}, + }, + { + name: "Update success - conflict and override", + storedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource, DefaultVariant: "True"}, + }, + }, + UpdateRequest: request{ + source: mockOverrideSource, + flags: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockOverrideSource, DefaultVariant: "True"}, + }, + }, + }, + expectedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockOverrideSource, DefaultVariant: "True"}, + }, + }, + expectedNotificationKeys: []string{"A"}, + }, + { + name: "Update fail", + storedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource}, + }, + }, + UpdateRequest: request{ + source: mockSource, + flags: Flags{ + Flags: map[string]Flag{ + "B": {Source: mockSource}, + }, + }, + }, + expectedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource}, + }, + }, + expectedNotificationKeys: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + messages := tt.storedState.Update(mockLogger, tt.UpdateRequest.source, tt.UpdateRequest.flags) + + require.Equal(t, tt.storedState, tt.expectedState) + + for k := range messages { + require.Containsf(t, tt.expectedNotificationKeys, k, + "Message key %s not present in the expected key list", k) + } + }) + } +} + +func TestFlags_Delete(t *testing.T) { + mockLogger := logger.NewLogger(nil, false) + mockSource := "source" + + tests := []struct { + name string + storedState Flags + deleteRequest Flags + expectedState Flags + expectedNotificationKeys []string + }{ + { + name: "Remove success", + storedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource}, + "B": {Source: mockSource}, + }, + }, + deleteRequest: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource}, + }, + }, + expectedState: Flags{ + Flags: map[string]Flag{ + "B": {Source: mockSource}, + }, + }, + expectedNotificationKeys: []string{"A"}, + }, + { + name: "Nothing to remove", + storedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource}, + "B": {Source: mockSource}, + }, + }, + deleteRequest: Flags{ + Flags: map[string]Flag{ + "C": {Source: mockSource}, + }, + }, + expectedState: Flags{ + Flags: map[string]Flag{ + "A": {Source: mockSource}, + "B": {Source: mockSource}, + }, + }, + expectedNotificationKeys: []string{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + messages := tt.storedState.Delete(mockLogger, mockSource, tt.deleteRequest) + + require.Equal(t, tt.storedState, tt.expectedState) + + for k := range messages { + require.Containsf(t, tt.expectedNotificationKeys, k, + "Message key %s not present in the expected key list", k) + } + }) + } +} diff --git a/pkg/eval/json_evaluator_test.go b/pkg/eval/json_evaluator_test.go index 866f8fb27..1b659207c 100644 --- a/pkg/eval/json_evaluator_test.go +++ b/pkg/eval/json_evaluator_test.go @@ -10,8 +10,8 @@ import ( "github.com/open-feature/flagd/pkg/eval" "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/model" + "github.com/open-feature/flagd/pkg/sync" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/structpb" ) @@ -275,8 +275,8 @@ var Flags = fmt.Sprintf(`{ DisabledFlag) func TestGetState_Valid_ContainsFlag(t *testing.T) { - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", ValidFlags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags}) if err != nil { t.Fatalf("Expected no error") } @@ -295,28 +295,28 @@ func TestGetState_Valid_ContainsFlag(t *testing.T) { } func TestSetState_Invalid_Error(t *testing.T) { - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) // set state with an invalid flag definition - _, err := evaluator.SetState("", InvalidFlags) + _, err := evaluator.SetState(sync.DataSync{FlagData: InvalidFlags}) if err == nil { t.Fatalf("expected error") } } func TestSetState_Valid_NoError(t *testing.T) { - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) // set state with a valid flag definition - _, err := evaluator.SetState("", ValidFlags) + _, err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags}) if err != nil { t.Fatalf("expected no error") } } func TestResolveAllValues(t *testing.T) { - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", Flags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: Flags}) if err != nil { t.Fatalf("expected no error") } @@ -375,8 +375,8 @@ func TestResolveBooleanValue(t *testing.T) { {DisabledFlag, nil, StaticBoolValue, model.ErrorReason, model.FlagDisabledErrorCode}, } const reqID = "default" - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", Flags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: Flags}) if err != nil { t.Fatalf("expected no error") } @@ -414,8 +414,8 @@ func BenchmarkResolveBooleanValue(b *testing.B) { {DisabledFlag, nil, StaticBoolValue, model.ErrorReason, model.FlagDisabledErrorCode}, } - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", Flags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: Flags}) if err != nil { b.Fatalf("expected no error") } @@ -458,8 +458,8 @@ func TestResolveStringValue(t *testing.T) { {DisabledFlag, nil, "", model.ErrorReason, model.FlagDisabledErrorCode}, } const reqID = "default" - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", Flags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: Flags}) if err != nil { t.Fatalf("expected no error") } @@ -498,8 +498,8 @@ func BenchmarkResolveStringValue(b *testing.B) { {DisabledFlag, nil, "", model.ErrorReason, model.FlagDisabledErrorCode}, } - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", Flags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: Flags}) if err != nil { b.Fatalf("expected no error") } @@ -542,8 +542,8 @@ func TestResolveFloatValue(t *testing.T) { {DisabledFlag, nil, 0, model.ErrorReason, model.FlagDisabledErrorCode}, } const reqID = "default" - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", Flags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: Flags}) if err != nil { t.Fatalf("expected no error") } @@ -582,8 +582,8 @@ func BenchmarkResolveFloatValue(b *testing.B) { {DisabledFlag, nil, 0, model.ErrorReason, model.FlagDisabledErrorCode}, } - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", Flags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: Flags}) if err != nil { b.Fatalf("expected no error") } @@ -626,8 +626,8 @@ func TestResolveIntValue(t *testing.T) { {DisabledFlag, nil, 0, model.ErrorReason, model.FlagDisabledErrorCode}, } const reqID = "default" - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", Flags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: Flags}) if err != nil { t.Fatalf("expected no error") } @@ -666,8 +666,8 @@ func BenchmarkResolveIntValue(b *testing.B) { {DisabledFlag, nil, 0, model.ErrorReason, model.FlagDisabledErrorCode}, } - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", Flags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: Flags}) if err != nil { b.Fatalf("expected no error") } @@ -710,8 +710,8 @@ func TestResolveObjectValue(t *testing.T) { {DisabledFlag, nil, "{}", model.ErrorReason, model.FlagDisabledErrorCode}, } const reqID = "default" - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", Flags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: Flags}) if err != nil { t.Fatalf("expected no error") } @@ -753,8 +753,8 @@ func BenchmarkResolveObjectValue(b *testing.B) { {DisabledFlag, nil, "{}", model.ErrorReason, model.FlagDisabledErrorCode}, } - evaluator := eval.JSONEvaluator{Logger: logger.NewLogger(nil, false)} - _, err := evaluator.SetState("", Flags) + evaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) + _, err := evaluator.SetState(sync.DataSync{FlagData: Flags}) if err != nil { b.Fatalf("expected no error") } @@ -785,116 +785,6 @@ func BenchmarkResolveObjectValue(b *testing.B) { } } -func TestMergeFlags(t *testing.T) { - t.Parallel() - tests := []struct { - name string - current eval.Flags - new eval.Flags - newSource string - want eval.Flags - wantNotifs map[string]interface{} - }{ - { - name: "both nil", - current: eval.Flags{Flags: nil}, - new: eval.Flags{Flags: nil}, - want: eval.Flags{Flags: map[string]eval.Flag{}}, - wantNotifs: map[string]interface{}{}, - }, - { - name: "both empty flags", - current: eval.Flags{Flags: map[string]eval.Flag{}}, - new: eval.Flags{Flags: map[string]eval.Flag{}}, - want: eval.Flags{Flags: map[string]eval.Flag{}}, - wantNotifs: map[string]interface{}{}, - }, - { - name: "empty current", - current: eval.Flags{Flags: nil}, - new: eval.Flags{Flags: map[string]eval.Flag{}}, - want: eval.Flags{Flags: map[string]eval.Flag{}}, - wantNotifs: map[string]interface{}{}, - }, - { - name: "empty new", - current: eval.Flags{Flags: map[string]eval.Flag{}}, - new: eval.Flags{Flags: nil}, - want: eval.Flags{Flags: map[string]eval.Flag{}}, - wantNotifs: map[string]interface{}{}, - }, - { - name: "extra fields on each", - current: eval.Flags{Flags: map[string]eval.Flag{ - "waka": { - DefaultVariant: "off", - Source: "1", - }, - }}, - new: eval.Flags{Flags: map[string]eval.Flag{ - "paka": { - DefaultVariant: "on", - }, - }}, - newSource: "2", - want: eval.Flags{Flags: map[string]eval.Flag{ - "waka": { - DefaultVariant: "off", - Source: "1", - }, - "paka": { - DefaultVariant: "on", - Source: "2", - }, - }}, - wantNotifs: map[string]interface{}{ - "paka": map[string]interface{}{"type": "write", "source": "2"}, - }, - }, - { - name: "override", - current: eval.Flags{Flags: map[string]eval.Flag{ - "waka": {DefaultVariant: "off"}, - }}, - new: eval.Flags{Flags: map[string]eval.Flag{ - "waka": {DefaultVariant: "on"}, - "paka": {DefaultVariant: "on"}, - }}, - want: eval.Flags{Flags: map[string]eval.Flag{ - "waka": {DefaultVariant: "on"}, - "paka": {DefaultVariant: "on"}, - }}, - wantNotifs: map[string]interface{}{ - "waka": map[string]interface{}{"type": "update", "source": ""}, - "paka": map[string]interface{}{"type": "write", "source": ""}, - }, - }, - { - name: "identical", - current: eval.Flags{Flags: map[string]eval.Flag{ - "hello": {DefaultVariant: "off"}, - }}, - new: eval.Flags{Flags: map[string]eval.Flag{ - "hello": {DefaultVariant: "off"}, - }}, - want: eval.Flags{Flags: map[string]eval.Flag{ - "hello": {DefaultVariant: "off"}, - }}, - wantNotifs: map[string]interface{}{}, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - got, gotNotifs := tt.current.Merge(logger.NewLogger(nil, false), tt.newSource, tt.new) - require.Equal(t, tt.want, got) - require.Equal(t, tt.wantNotifs, gotNotifs) - }) - } -} - func TestSetState_DefaultVariantValidation(t *testing.T) { tests := map[string]struct { jsonFlags string @@ -946,9 +836,9 @@ func TestSetState_DefaultVariantValidation(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { - jsonEvaluator := eval.JSONEvaluator{} + jsonEvaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) - _, err := jsonEvaluator.SetState("", tt.jsonFlags) + _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.jsonFlags}) if tt.valid && err != nil { t.Error(err) @@ -1141,9 +1031,9 @@ func TestState_Evaluator(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { - jsonEvaluator := eval.JSONEvaluator{} + jsonEvaluator := eval.NewJSONEvaluator(logger.NewLogger(nil, false)) - _, err := jsonEvaluator.SetState("", tt.inputState) + _, err := jsonEvaluator.SetState(sync.DataSync{FlagData: tt.inputState}) if err != nil { if !tt.expectedError { t.Error(err) diff --git a/pkg/eval/mock/ievaluator.go b/pkg/eval/mock/ievaluator.go index bab5cbe31..80ddd3a92 100644 --- a/pkg/eval/mock/ievaluator.go +++ b/pkg/eval/mock/ievaluator.go @@ -9,6 +9,7 @@ import ( gomock "github.com/golang/mock/gomock" eval "github.com/open-feature/flagd/pkg/eval" + sync "github.com/open-feature/flagd/pkg/sync" structpb "google.golang.org/protobuf/types/known/structpb" ) @@ -150,16 +151,16 @@ func (mr *MockIEvaluatorMockRecorder) ResolveStringValue(reqID, flagKey, context } // SetState mocks base method. -func (m *MockIEvaluator) SetState(source, state string) (map[string]interface{}, error) { +func (m *MockIEvaluator) SetState(payload sync.DataSync) (map[string]interface{}, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetState", source, state) + ret := m.ctrl.Call(m, "SetState", payload) ret0, _ := ret[0].(map[string]interface{}) ret1, _ := ret[1].(error) return ret0, ret1 } // SetState indicates an expected call of SetState. -func (mr *MockIEvaluatorMockRecorder) SetState(source, state interface{}) *gomock.Call { +func (mr *MockIEvaluatorMockRecorder) SetState(payload interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetState", reflect.TypeOf((*MockIEvaluator)(nil).SetState), source, state) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetState", reflect.TypeOf((*MockIEvaluator)(nil).SetState), payload) } diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 281c8b996..41b468c68 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -89,11 +89,11 @@ func (r *Runtime) Start() error { } // updateWithNotify helps to update state and notify listeners -func (r *Runtime) updateWithNotify(data sync.DataSync) { +func (r *Runtime) updateWithNotify(payload sync.DataSync) { r.mu.Lock() defer r.mu.Unlock() - notifications, err := r.Evaluator.SetState(data.Source, data.FlagData) + notifications, err := r.Evaluator.SetState(payload) if err != nil { r.Logger.Error(err.Error()) return diff --git a/pkg/sync/file/filepath_sync.go b/pkg/sync/file/filepath_sync.go index c6128c477..8b05b4c03 100644 --- a/pkg/sync/file/filepath_sync.go +++ b/pkg/sync/file/filepath_sync.go @@ -44,7 +44,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { return err } - dataSync <- sync.DataSync{FlagData: fetch, Source: fs.URI} + dataSync <- sync.DataSync{FlagData: fetch, Source: fs.URI, Type: sync.ALL} fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI)) for { @@ -92,7 +92,7 @@ func (fs *Sync) sendDataSync(ctx context.Context, eventType fsnotify.Event, data fs.Logger.Error(fmt.Sprintf("Error fetching after %s notification: %s", eventType.Op.String(), err.Error())) } - dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI} + dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI, Type: sync.ALL} } func (fs *Sync) fetch(_ context.Context) (string, error) { diff --git a/pkg/sync/http/http_sync.go b/pkg/sync/http/http_sync.go index 477981f6f..ae8326f62 100644 --- a/pkg/sync/http/http_sync.go +++ b/pkg/sync/http/http_sync.go @@ -44,7 +44,7 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { return err } - dataSync <- sync.DataSync{FlagData: fetch, Source: hs.URI} + dataSync <- sync.DataSync{FlagData: fetch, Source: hs.URI, Type: sync.ALL} _ = hs.Cron.AddFunc("*/5 * * * *", func() { body, err := hs.fetchBodyFromURL(ctx, hs.URI) @@ -62,7 +62,7 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { if err != nil { hs.Logger.Error(fmt.Sprintf("error fetching: %s", err.Error())) } else { - dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI} + dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI, Type: sync.ALL} } } else { currentSHA := hs.generateSha(body) @@ -72,7 +72,7 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { if err != nil { hs.Logger.Error(fmt.Sprintf("error fetching: %s", err.Error())) } else { - dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI} + dataSync <- sync.DataSync{FlagData: msg, Source: hs.URI, Type: sync.ALL} } } diff --git a/pkg/sync/isync.go b/pkg/sync/isync.go index 51c46f4d7..f1db9cc3a 100644 --- a/pkg/sync/isync.go +++ b/pkg/sync/isync.go @@ -2,13 +2,26 @@ package sync import "context" +type ProviderArgs map[string]string + +type Type int + +// Type of the sync operation +const ( + // ALL - All flags of sync provider. This is the default if unset due to primitive default + ALL Type = iota + // ADD - Additional flags from sync provider + ADD + // UPDATE - Update for flag(s) previously provided + UPDATE + // DELETE - Delete for flag(s) previously provided + DELETE +) + /* ISync implementations watch for changes in the flag sources (HTTP backend, local file, K8s CRDs ...),fetch the latest value and communicate to the Runtime with DataSync channel */ - -type ProviderArgs map[string]string - type ISync interface { // Sync is the contract between Runtime and sync implementation. // Note that, it is expected to return the first data sync as soon as possible to fill the store. @@ -19,4 +32,5 @@ type ISync interface { type DataSync struct { FlagData string Source string + Type } diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index 3fd891654..f7f094cb1 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -40,7 +40,7 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { return err } - dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI} + dataSync <- sync.DataSync{FlagData: fetch, Source: k.URI, Type: sync.ALL} notifies := make(chan INotify) @@ -60,7 +60,7 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { continue } - dataSync <- sync.DataSync{FlagData: msg, Source: k.URI} + dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL} case DefaultEventTypeModify: k.Logger.Debug("Configuration modified") msg, err := k.fetch(ctx) @@ -69,7 +69,7 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { continue } - dataSync <- sync.DataSync{FlagData: msg, Source: k.URI} + dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL} case DefaultEventTypeDelete: k.Logger.Debug("configuration deleted") case DefaultEventTypeReady: