diff --git a/sync/lock_test.go b/sync/lock_test.go index c99f560..33dff7a 100644 --- a/sync/lock_test.go +++ b/sync/lock_test.go @@ -33,7 +33,7 @@ func Test_LockBaseTesting(t *testing.T) { s := client.GetDataStore("test-sync") - err = InitializeLockOwner(context.Background(), s, "test-owner") + err = InitializeOwner(context.Background(), s, "test-owner") if err != nil && !errors.IsAlreadyExists(err) { t.Errorf("Got error while initializing lock owner %s", err) } diff --git a/sync/owner.go b/sync/owner.go index 9779604..57cf5f5 100644 --- a/sync/owner.go +++ b/sync/owner.go @@ -19,8 +19,8 @@ import ( ) const ( - // collection name for Lock ownership table - lockOwnerShipCollection = "lock-owner-table" + // collection name for sync ownership table + ownerShipCollection = "owner-table" // default periodic interval for updating // last seen time for owner, in seconds @@ -51,7 +51,7 @@ type ownerTableType struct { func (t *ownerTableType) DeleteCallback(op string, wKey interface{}) { key := wKey.(*ownerKey) if key.Name == t.key.Name { - log.Panicln("receiving delete notification of self lock") + log.Panicln("OnwerTable: receiving delete notification of self") } } @@ -78,7 +78,7 @@ func (t *ownerTableType) deleteAgedOwnerTableEntries() { } _, err := t.col.DeleteMany(t.ctx, filter) if err != nil && !errors.IsNotFound(err) { - log.Printf("failed to perform delete of aged lock owner entries") + log.Printf("failed to perform delete of aged owner table entries") } } @@ -143,7 +143,7 @@ func (t *ownerTableType) allocateOwner(name string) error { // released err = t.col.DeleteOne(context.Background(), t.key) if err != nil { - log.Printf("failed deleting self owner lock: %s, got error: %s", t.key.Name, err) + log.Printf("failed deleting self owner entry: %s, got error: %s", t.key.Name, err) } return } @@ -160,31 +160,31 @@ var ( ownerTableInit sync.Mutex ) -// Initialize the Lock Owner management constructs, anyone while working with +// Initialize the Sync Owner management constructs, anyone while working with // this library requires to use this function before actually start consuming // any functionality from here. // Also it is callers responsibility to ensure providing uniform store // definition for all the consuming processes to ensure synchronisation to work // in a seemless manner -func InitializeLockOwner(ctx context.Context, store db.Store, name string) error { - return InitializeLockOwnerWithUpdateInterval(ctx, store, name, defaultOwnerUpdateInterval) +func InitializeOwner(ctx context.Context, store db.Store, name string) error { + return InitializeOwnerWithUpdateInterval(ctx, store, name, defaultOwnerUpdateInterval) } -// Initialize the Lock Owner management constructs, anyone while working with +// Initialize the Owner management constructs, anyone while working with // this library requires to use this function before actually start consuming // any functionality from here. // This also allows specifying the interval to ensuring configurability // Also it is callers responsibility to ensure providing uniform store // definition for all the consuming processes to ensure synchronisation to work // in a seemless manner -func InitializeLockOwnerWithUpdateInterval(ctx context.Context, store db.Store, name string, interval time.Duration) error { +func InitializeOwnerWithUpdateInterval(ctx context.Context, store db.Store, name string, interval time.Duration) error { ownerTableInit.Lock() defer ownerTableInit.Unlock() if ownerTable != nil { - return errors.Wrap(errors.AlreadyExists, "Lock Owner is already initialized") + return errors.Wrap(errors.AlreadyExists, "Sync Owner Table is already initialized") } - col := store.GetCollection(lockOwnerShipCollection) + col := store.GetCollection(ownerShipCollection) ownerTable = &ownerTableType{ ctx: ctx, diff --git a/sync/owner_test.go b/sync/owner_test.go index 73321cb..917ac73 100644 --- a/sync/owner_test.go +++ b/sync/owner_test.go @@ -36,9 +36,9 @@ func Test_OwnerInit(t *testing.T) { } s := client.GetDataStore("test-sync") - err = InitializeLockOwner(ctx, s, "test-owner") + err = InitializeOwner(ctx, s, "test-owner") if err != nil && !errors.IsAlreadyExists(err) { - t.Errorf("Got error while initializing lock owner %s", err) + t.Errorf("Got error while initializing sync owner %s", err) } time.Sleep(1 * time.Second) } diff --git a/sync/provider.go b/sync/provider.go new file mode 100644 index 0000000..9dfc0fc --- /dev/null +++ b/sync/provider.go @@ -0,0 +1,225 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package sync + +import ( + "context" + "log" + "reflect" + "time" + + "github.com/google/uuid" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + + "github.com/Prabhjot-Sethi/core/db" + "github.com/Prabhjot-Sethi/core/errors" +) + +const ( + // standard provider table name + defaultProviderTableName = "provider-table" +) + +var ( + // singleton object for provider table + providerTable *ProviderTable +) + +type providerKey struct { + ExtKey any `bson:"extKey,omitempty"` + ProviderId uuid.UUID `bson:"providerId,omitempty"` + CreateTime int64 `bson:"createTime,omitempty"` +} + +type Provider struct { + key *providerKey + tbl *ProviderTable +} + +type listKeyEntry struct { + Key *providerKey `bson:"_id,omitempty"` +} + +func (p *Provider) Close() error { + return p.tbl.col.DeleteOne(context.Background(), p.key) +} + +type providerData struct { + Owner string `bson:"owner,omitempty"` +} + +type ProviderTable struct { + // collection name hosting locks for the table + colName string + + // collection object for the database store + col db.StoreCollection + + // context in which this lock table is being working on + ctx context.Context + + // Context cancel function + cancelFn context.CancelFunc +} + +// Provider Table callback function, currently meant for +// clearing providers once the owner corresponding to those +// providers no longer exists +// eventually we will handle observers also as part of this +func (t *ProviderTable) Callback(op string, wKey interface{}) { + key := wKey.(*providerKey) + + entry := &providerData{} + err := t.col.FindOne(context.Background(), key, entry) + if err != nil { + if errors.IsNotFound(err) { + return + } + log.Panicf("failed to find the entry while working with: %s", err) + } + + oKey := &ownerKey{ + Name: entry.Owner, + } + + oData := &ownerData{} + + err = ownerTable.col.FindOne(context.Background(), oKey, oData) + + if err != nil { + if errors.IsNotFound(err) { + filter := bson.D{{ + Key: "owner", + Value: oKey.Name, + }} + _, err := t.col.DeleteMany(t.ctx, filter) + if err != nil && !errors.IsNotFound(err) { + log.Panicf("failed to perform delete of providers for owner %s, got error: %s", oKey.Name, err) + } + } + } +} + +// callback function to handle release of owner object +// responsible for clearing all the providers initiated +// by the specific owner, typically this is run by other +// participating microservices / processes +func (t *ProviderTable) handleOwnerRelease(op string, wKey any) { + key := wKey.(*ownerKey) + + filter := bson.D{{ + Key: "owner", + Value: key.Name, + }} + _, err := t.col.DeleteMany(t.ctx, filter) + if err != nil && !errors.IsNotFound(err) { + log.Panicf("failed to perform delete of providers for owner %s, got error: %s", key.Name, err) + } +} + +// create provider based on the specified key, typically a string, +// Returns Provider handle, allowing to close the provider +func (t *ProviderTable) CreateProvider(ctx context.Context, extKey any) (*Provider, error) { + // if ownertable is not initialized, then lock infra cannot be used + if ownerTable == nil || ownerTable.key == nil { + return nil, errors.Wrap(errors.InvalidArgument, "owner infra for provider table is not initialized") + } + + key := &providerKey{ + ExtKey: extKey, + CreateTime: time.Now().Unix(), + ProviderId: uuid.New(), + } + + data := &providerData{ + Owner: ownerTable.key.Name, + } + + err := t.col.InsertOne(ctx, key, data) + if err != nil { + return nil, err + } + + return &Provider{ + key: key, + tbl: t, + }, nil +} + +// Locate Provider table with pre-specified table name +// while working out of standard provider table +func LocateProviderTable(store db.Store) (*ProviderTable, error) { + return LocateProviderTableWithName(store, defaultProviderTableName) +} + +// Locate Provider table with specific table name +// meant for consumers want to work out of non standard Provider tables +func LocateProviderTableWithName(store db.Store, name string) (*ProviderTable, error) { + if providerTable != nil { + return providerTable, nil + } + + // ensure owner table is initialized before proceeding further + if ownerTable == nil { + return nil, errors.Wrap(errors.InvalidArgument, "Mandatory! owner table infra not initialized") + } + + ctx, cancelFn := context.WithCancel(ownerTable.ctx) + + // no existing table found, allocate a new one + col := store.GetCollection(name) + table := &ProviderTable{ + colName: name, + col: col, + ctx: ctx, + cancelFn: cancelFn, + } + + matchDeleteStage := mongo.Pipeline{ + bson.D{{ + Key: "$match", + Value: bson.D{{ + Key: "operationType", + Value: "delete", + }}, + }}, + } + + // watch only for delete notification of lock owner + err := ownerTable.col.Watch(ctx, matchDeleteStage, table.handleOwnerRelease) + if err != nil { + cancelFn() + return nil, err + } + + err = table.col.SetKeyType(reflect.TypeOf(&providerKey{})) + if err != nil { + cancelFn() + return nil, err + } + + // register to watch for locks, this is relevant for external + // notification and cleanup as part of handling of release of owners + err = table.col.Watch(ctx, nil, table.Callback) + if err != nil { + cancelFn() + return nil, err + } + + go func() { + list := []listKeyEntry{} + + err = table.col.FindMany(ctx, nil, &list) + if err != nil { + return + } + + for _, entry := range list { + table.Callback("insert", entry.Key) + } + }() + + return table, nil +} diff --git a/sync/provider_test.go b/sync/provider_test.go new file mode 100644 index 0000000..3def3bb --- /dev/null +++ b/sync/provider_test.go @@ -0,0 +1,74 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package sync + +import ( + "context" + "testing" + "time" + + "github.com/Prabhjot-Sethi/core/db" + "github.com/Prabhjot-Sethi/core/errors" +) + +type myProviderKey struct { + Scope string + Name string +} + +func Test_ProviderBaseTesting(t *testing.T) { + config := &db.MongoConfig{ + Host: "localhost", + Port: "27017", + Username: "root", + Password: "password", + } + + client, err := db.NewMongoClient(config) + + if err != nil { + t.Errorf("failed to connect to mongo DB Error: %s", err) + return + } + + s := client.GetDataStore("test-sync") + + err = InitializeOwner(context.Background(), s, "test-owner") + if err != nil && !errors.IsAlreadyExists(err) { + t.Errorf("Got error while initializing sync owner %s", err) + } + + tbl, err := LocateProviderTable(s) + + if err != nil { + t.Errorf("failed to locate provider Table: %s", err) + } + + key1 := &myProviderKey{ + Scope: "scope-1", + Name: "test-key", + } + + provider, err := tbl.CreateProvider(context.Background(), key1) + if err != nil { + t.Errorf("failed to create Provider: %s", err) + } + _, err = tbl.CreateProvider(context.Background(), key1) + if err != nil { + t.Errorf("failed to create Provider: %s", err) + } + key2 := &myProviderKey{ + Scope: "scope-2", + Name: "test-key", + } + + provider1, err := tbl.CreateProvider(context.Background(), key2) + if err != nil { + t.Errorf("failed to acquire lock: %s", err) + } + _ = provider1.Close() + _ = provider.Close() + + time.Sleep(2 * time.Second) +} diff --git a/sync/test/example.go b/sync/test/example.go index 832a6de..9f5c9e2 100644 --- a/sync/test/example.go +++ b/sync/test/example.go @@ -37,10 +37,17 @@ func main() { } s := client.GetDataStore("test-sync") - err = sync.InitializeLockOwnerWithUpdateInterval(ctx, s, "test-owner", 1) + err = sync.InitializeOwnerWithUpdateInterval(ctx, s, "test-owner", 10) if err != nil && !errors.IsAlreadyExists(err) { - log.Panicf("Got error while initializing lock owner %s", err) + log.Panicf("Got error while initializing sync owner %s", err) } + + _, err = sync.LocateProviderTable(s) + + if err != nil { + log.Panicf("failed to locate provider Table: %s", err) + } + for { // loop endlessly to run aging process for owner table time.Sleep(5 * time.Second)