From 8d7421decb61e842d91dc642e4e0a56455b20018 Mon Sep 17 00:00:00 2001 From: Prabhjot Singh Sethi Date: Sun, 20 Apr 2025 10:36:08 +0000 Subject: [PATCH] Add Observers capability to Provider Table Signed-off-by: Prabhjot Singh Sethi --- resource/bitmap.go | 14 ++++++ sync/observer.go | 76 +++++++++++++++++++++++++++++++ sync/observer_test.go | 103 ++++++++++++++++++++++++++++++++++++++++++ sync/provider.go | 48 +++++++++++++++++++- sync/provider_test.go | 23 ++-------- 5 files changed, 244 insertions(+), 20 deletions(-) create mode 100644 resource/bitmap.go create mode 100644 sync/observer.go create mode 100644 sync/observer_test.go diff --git a/resource/bitmap.go b/resource/bitmap.go new file mode 100644 index 0000000..8284841 --- /dev/null +++ b/resource/bitmap.go @@ -0,0 +1,14 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +// Initial reference and motivation taken from +// https://github.com/cilium/ipam/blob/master/service/allocator/bitmap.go +// However, we will require to use this while being backed by data store +// thus avoiding usage of big int and replacing it with multi-dimensional +// metrix of int64 + +package resource + +type Bitmap struct { + //Bits []uint64 `bson:bits,omitempty` +} diff --git a/sync/observer.go b/sync/observer.go new file mode 100644 index 0000000..e3c9eed --- /dev/null +++ b/sync/observer.go @@ -0,0 +1,76 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package sync + +import ( + "sync" + + "github.com/Prabhjot-Sethi/core/reconciler" +) + +type observerCountKey struct { + ExtKey any `bson:"_id.extKey,omitempty"` +} + +type observerTable struct { + reconciler.ManagerImpl + mu sync.RWMutex + providers map[any]struct{} +} + +func (o *observerTable) getProviderList() []any { + list := []any{} + func() { + o.mu.Lock() + defer o.mu.Unlock() + for k := range o.providers { + list = append(list, k) + } + }() + return list +} + +func (o *observerTable) isProviderAvailable(key any) bool { + o.mu.Lock() + defer o.mu.Unlock() + _, ok := o.providers[key] + return ok +} + +func (o *observerTable) deleteProvider(key any) { + var ok bool + func() { + o.mu.Lock() + defer o.mu.Unlock() + _, ok = o.providers[key] + if ok { + delete(o.providers, key) + } + }() + if ok { + // since the observer is removed trigger an update + // for controllers + o.NotifyCallback(key) + } +} + +func (o *observerTable) insertProvider(key any) { + var ok bool + func() { + o.mu.Lock() + defer o.mu.Unlock() + _, ok = o.providers[key] + if !ok { + o.providers[key] = struct{}{} + } + }() + if !ok { + // Notify an insert of new provider to providers + o.NotifyCallback(key) + } +} + +func (o *observerTable) ReconcilerGetAllKeys() []any { + return o.getProviderList() +} diff --git a/sync/observer_test.go b/sync/observer_test.go new file mode 100644 index 0000000..ccdd37e --- /dev/null +++ b/sync/observer_test.go @@ -0,0 +1,103 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package sync + +import ( + "context" + "log" + "testing" + "time" + + "github.com/Prabhjot-Sethi/core/db" + "github.com/Prabhjot-Sethi/core/errors" + "github.com/Prabhjot-Sethi/core/reconciler" +) + +type MyObserver struct { + reconciler.Controller + providers map[string]struct{} + tbl *ProviderTable +} + +func (o *MyObserver) Reconcile(k any) (*reconciler.Result, error) { + key := k.(string) + if key == "" { + log.Panicln("Got invalid key response") + } + if o.tbl.IsProviderAvailable(key) { + o.providers[key] = struct{}{} + } else { + delete(o.providers, key) + } + return &reconciler.Result{}, nil +} + +func Test_ObserverBaseTesting(t *testing.T) { + config := &db.MongoConfig{ + Host: "localhost", + Port: "27017", + Username: "root", + Password: "password", + } + + client, err := db.NewMongoClient(config) + + if err != nil { + t.Errorf("failed to connect to mongo DB Error: %s", err) + return + } + + s := client.GetDataStore("test-sync") + + err = InitializeOwner(context.Background(), s, "test-owner") + if err != nil && !errors.IsAlreadyExists(err) { + t.Errorf("Got error while initializing sync owner %s", err) + } + + tbl, err := LocateProviderTable(s) + if err != nil { + t.Errorf("failed to locate provider Table: %s", err) + } + + obs := &MyObserver{ + providers: map[string]struct{}{}, + tbl: tbl, + } + tbl.Register("test-observer", obs) + + provider, err := tbl.CreateProvider(context.Background(), "test-key") + if err != nil { + t.Errorf("failed to create Provider: %s", err) + } + time.Sleep(1 * time.Second) + if len(obs.providers) != 1 { + t.Errorf("Expected 1 provider but got %d", len(obs.providers)) + } + + providerDup, err := tbl.CreateProvider(context.Background(), "test-key") + if err != nil { + t.Errorf("failed to create Provider: %s", err) + } + time.Sleep(1 * time.Second) + if len(obs.providers) != 1 { + t.Errorf("Expected 1 provider but got %d", len(obs.providers)) + } + + provider1, err := tbl.CreateProvider(context.Background(), "test-key-1") + if err != nil { + t.Errorf("failed to create provider: %s", err) + } + time.Sleep(1 * time.Second) + if len(obs.providers) != 2 { + t.Errorf("Expected 2 provider but got %d", len(obs.providers)) + } + _ = provider1.Close() + _ = providerDup.Close() + _ = provider.Close() + + time.Sleep(2 * time.Second) + if len(obs.providers) != 0 { + t.Errorf("Expected 0 provider but got %d", len(obs.providers)) + } +} diff --git a/sync/provider.go b/sync/provider.go index 9dfc0fc..34917a4 100644 --- a/sync/provider.go +++ b/sync/provider.go @@ -15,6 +15,7 @@ import ( "github.com/Prabhjot-Sethi/core/db" "github.com/Prabhjot-Sethi/core/errors" + "github.com/Prabhjot-Sethi/core/reconciler" ) const ( @@ -62,6 +63,9 @@ type ProviderTable struct { // Context cancel function cancelFn context.CancelFunc + + // observer table + oTbl *observerTable } // Provider Table callback function, currently meant for @@ -71,8 +75,23 @@ type ProviderTable struct { func (t *ProviderTable) Callback(op string, wKey interface{}) { key := wKey.(*providerKey) + // ensure updating the observer table based on availability + // or unavailability of provider + obKey := &observerCountKey{ + ExtKey: key.ExtKey, + } + cnt, err := t.col.Count(context.Background(), obKey) + if err != nil { + log.Panicf("failed to fetch count of providers: %s", err) + } + if cnt == 0 { + t.oTbl.deleteProvider(obKey.ExtKey) + } else { + t.oTbl.insertProvider(obKey.ExtKey) + } + entry := &providerData{} - err := t.col.FindOne(context.Background(), key, entry) + err = t.col.FindOne(context.Background(), key, entry) if err != nil { if errors.IsNotFound(err) { return @@ -119,6 +138,22 @@ func (t *ProviderTable) handleOwnerRelease(op string, wKey any) { } } +// Allow a reconciler controller to register and get notified for availability +// and unavailability of providers +func (t *ProviderTable) Register(name string, crtl reconciler.Controller) error { + return t.oTbl.Register(name, crtl) +} + +// Get List of Providers +func (t *ProviderTable) GetProviderList() []any { + return t.oTbl.getProviderList() +} + +// Checks if provider exists +func (t *ProviderTable) IsProviderAvailable(key any) bool { + return t.oTbl.isProviderAvailable(key) +} + // create provider based on the specified key, typically a string, // Returns Provider handle, allowing to close the provider func (t *ProviderTable) CreateProvider(ctx context.Context, extKey any) (*Provider, error) { @@ -175,6 +210,15 @@ func LocateProviderTableWithName(store db.Store, name string) (*ProviderTable, e col: col, ctx: ctx, cancelFn: cancelFn, + oTbl: &observerTable{ + providers: make(map[any]struct{}), + }, + } + + err := table.oTbl.Initialize(ctx, table.oTbl) + if err != nil { + cancelFn() + return nil, err } matchDeleteStage := mongo.Pipeline{ @@ -188,7 +232,7 @@ func LocateProviderTableWithName(store db.Store, name string) (*ProviderTable, e } // watch only for delete notification of lock owner - err := ownerTable.col.Watch(ctx, matchDeleteStage, table.handleOwnerRelease) + err = ownerTable.col.Watch(ctx, matchDeleteStage, table.handleOwnerRelease) if err != nil { cancelFn() return nil, err diff --git a/sync/provider_test.go b/sync/provider_test.go index 3def3bb..bd20dd5 100644 --- a/sync/provider_test.go +++ b/sync/provider_test.go @@ -12,11 +12,6 @@ import ( "github.com/Prabhjot-Sethi/core/errors" ) -type myProviderKey struct { - Scope string - Name string -} - func Test_ProviderBaseTesting(t *testing.T) { config := &db.MongoConfig{ Host: "localhost", @@ -45,29 +40,21 @@ func Test_ProviderBaseTesting(t *testing.T) { t.Errorf("failed to locate provider Table: %s", err) } - key1 := &myProviderKey{ - Scope: "scope-1", - Name: "test-key", - } - - provider, err := tbl.CreateProvider(context.Background(), key1) + provider, err := tbl.CreateProvider(context.Background(), "test-key") if err != nil { t.Errorf("failed to create Provider: %s", err) } - _, err = tbl.CreateProvider(context.Background(), key1) + providerDup, err := tbl.CreateProvider(context.Background(), "test-key") if err != nil { t.Errorf("failed to create Provider: %s", err) } - key2 := &myProviderKey{ - Scope: "scope-2", - Name: "test-key", - } - provider1, err := tbl.CreateProvider(context.Background(), key2) + provider1, err := tbl.CreateProvider(context.Background(), "test-key-1") if err != nil { - t.Errorf("failed to acquire lock: %s", err) + t.Errorf("failed to create provider: %s", err) } _ = provider1.Close() + _ = providerDup.Close() _ = provider.Close() time.Sleep(2 * time.Second)