Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sync/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Test_LockBaseTesting(t *testing.T) {

s := client.GetDataStore("test-sync")

err = InitializeLockOwner(context.Background(), s, "test-owner")
err = InitializeOwner(context.Background(), s, "test-owner")
if err != nil && !errors.IsAlreadyExists(err) {
t.Errorf("Got error while initializing lock owner %s", err)
}
Expand Down
24 changes: 12 additions & 12 deletions sync/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
)

const (
// collection name for Lock ownership table
lockOwnerShipCollection = "lock-owner-table"
// collection name for sync ownership table
ownerShipCollection = "owner-table"

// default periodic interval for updating
// last seen time for owner, in seconds
Expand Down Expand Up @@ -51,7 +51,7 @@ type ownerTableType struct {
func (t *ownerTableType) DeleteCallback(op string, wKey interface{}) {
key := wKey.(*ownerKey)
if key.Name == t.key.Name {
log.Panicln("receiving delete notification of self lock")
log.Panicln("OnwerTable: receiving delete notification of self")
}
}

Expand All @@ -78,7 +78,7 @@ func (t *ownerTableType) deleteAgedOwnerTableEntries() {
}
_, err := t.col.DeleteMany(t.ctx, filter)
if err != nil && !errors.IsNotFound(err) {
log.Printf("failed to perform delete of aged lock owner entries")
log.Printf("failed to perform delete of aged owner table entries")
}
}

Expand Down Expand Up @@ -143,7 +143,7 @@ func (t *ownerTableType) allocateOwner(name string) error {
// released
err = t.col.DeleteOne(context.Background(), t.key)
if err != nil {
log.Printf("failed deleting self owner lock: %s, got error: %s", t.key.Name, err)
log.Printf("failed deleting self owner entry: %s, got error: %s", t.key.Name, err)
}
return
}
Expand All @@ -160,31 +160,31 @@ var (
ownerTableInit sync.Mutex
)

// Initialize the Lock Owner management constructs, anyone while working with
// Initialize the Sync Owner management constructs, anyone while working with
// this library requires to use this function before actually start consuming
// any functionality from here.
// Also it is callers responsibility to ensure providing uniform store
// definition for all the consuming processes to ensure synchronisation to work
// in a seemless manner
func InitializeLockOwner(ctx context.Context, store db.Store, name string) error {
return InitializeLockOwnerWithUpdateInterval(ctx, store, name, defaultOwnerUpdateInterval)
func InitializeOwner(ctx context.Context, store db.Store, name string) error {
return InitializeOwnerWithUpdateInterval(ctx, store, name, defaultOwnerUpdateInterval)
}

// Initialize the Lock Owner management constructs, anyone while working with
// Initialize the Owner management constructs, anyone while working with
// this library requires to use this function before actually start consuming
// any functionality from here.
// This also allows specifying the interval to ensuring configurability
// Also it is callers responsibility to ensure providing uniform store
// definition for all the consuming processes to ensure synchronisation to work
// in a seemless manner
func InitializeLockOwnerWithUpdateInterval(ctx context.Context, store db.Store, name string, interval time.Duration) error {
func InitializeOwnerWithUpdateInterval(ctx context.Context, store db.Store, name string, interval time.Duration) error {
ownerTableInit.Lock()
defer ownerTableInit.Unlock()
if ownerTable != nil {
return errors.Wrap(errors.AlreadyExists, "Lock Owner is already initialized")
return errors.Wrap(errors.AlreadyExists, "Sync Owner Table is already initialized")
}

col := store.GetCollection(lockOwnerShipCollection)
col := store.GetCollection(ownerShipCollection)

ownerTable = &ownerTableType{
ctx: ctx,
Expand Down
4 changes: 2 additions & 2 deletions sync/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func Test_OwnerInit(t *testing.T) {
}

s := client.GetDataStore("test-sync")
err = InitializeLockOwner(ctx, s, "test-owner")
err = InitializeOwner(ctx, s, "test-owner")
if err != nil && !errors.IsAlreadyExists(err) {
t.Errorf("Got error while initializing lock owner %s", err)
t.Errorf("Got error while initializing sync owner %s", err)
}
time.Sleep(1 * time.Second)
}
225 changes: 225 additions & 0 deletions sync/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

package sync

import (
"context"
"log"
"reflect"
"time"

"github.com/google/uuid"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"

"github.com/Prabhjot-Sethi/core/db"
"github.com/Prabhjot-Sethi/core/errors"
)

const (
// standard provider table name
defaultProviderTableName = "provider-table"
)

var (
// singleton object for provider table
providerTable *ProviderTable
)

type providerKey struct {
ExtKey any `bson:"extKey,omitempty"`
ProviderId uuid.UUID `bson:"providerId,omitempty"`
CreateTime int64 `bson:"createTime,omitempty"`
}

type Provider struct {
key *providerKey
tbl *ProviderTable
}

type listKeyEntry struct {
Key *providerKey `bson:"_id,omitempty"`
}

func (p *Provider) Close() error {
return p.tbl.col.DeleteOne(context.Background(), p.key)
}

type providerData struct {
Owner string `bson:"owner,omitempty"`
}

type ProviderTable struct {
// collection name hosting locks for the table
colName string

// collection object for the database store
col db.StoreCollection

// context in which this lock table is being working on
ctx context.Context

// Context cancel function
cancelFn context.CancelFunc
}

// Provider Table callback function, currently meant for
// clearing providers once the owner corresponding to those
// providers no longer exists
// eventually we will handle observers also as part of this
func (t *ProviderTable) Callback(op string, wKey interface{}) {
key := wKey.(*providerKey)

entry := &providerData{}
err := t.col.FindOne(context.Background(), key, entry)
if err != nil {
if errors.IsNotFound(err) {
return
}
log.Panicf("failed to find the entry while working with: %s", err)
}

oKey := &ownerKey{
Name: entry.Owner,
}

oData := &ownerData{}

err = ownerTable.col.FindOne(context.Background(), oKey, oData)

if err != nil {
if errors.IsNotFound(err) {
filter := bson.D{{
Key: "owner",
Value: oKey.Name,
}}
_, err := t.col.DeleteMany(t.ctx, filter)
if err != nil && !errors.IsNotFound(err) {
log.Panicf("failed to perform delete of providers for owner %s, got error: %s", oKey.Name, err)
}
}
}
}

// callback function to handle release of owner object
// responsible for clearing all the providers initiated
// by the specific owner, typically this is run by other
// participating microservices / processes
func (t *ProviderTable) handleOwnerRelease(op string, wKey any) {
key := wKey.(*ownerKey)

filter := bson.D{{
Key: "owner",
Value: key.Name,
}}
_, err := t.col.DeleteMany(t.ctx, filter)
if err != nil && !errors.IsNotFound(err) {
log.Panicf("failed to perform delete of providers for owner %s, got error: %s", key.Name, err)
}
}

// create provider based on the specified key, typically a string,
// Returns Provider handle, allowing to close the provider
func (t *ProviderTable) CreateProvider(ctx context.Context, extKey any) (*Provider, error) {
// if ownertable is not initialized, then lock infra cannot be used
if ownerTable == nil || ownerTable.key == nil {
return nil, errors.Wrap(errors.InvalidArgument, "owner infra for provider table is not initialized")
}

key := &providerKey{
ExtKey: extKey,
CreateTime: time.Now().Unix(),
ProviderId: uuid.New(),
}

data := &providerData{
Owner: ownerTable.key.Name,
}

err := t.col.InsertOne(ctx, key, data)
if err != nil {
return nil, err
}

return &Provider{
key: key,
tbl: t,
}, nil
}

// Locate Provider table with pre-specified table name
// while working out of standard provider table
func LocateProviderTable(store db.Store) (*ProviderTable, error) {
return LocateProviderTableWithName(store, defaultProviderTableName)
}

// Locate Provider table with specific table name
// meant for consumers want to work out of non standard Provider tables
func LocateProviderTableWithName(store db.Store, name string) (*ProviderTable, error) {
if providerTable != nil {
return providerTable, nil
}

// ensure owner table is initialized before proceeding further
if ownerTable == nil {
return nil, errors.Wrap(errors.InvalidArgument, "Mandatory! owner table infra not initialized")
}

ctx, cancelFn := context.WithCancel(ownerTable.ctx)

// no existing table found, allocate a new one
col := store.GetCollection(name)
table := &ProviderTable{
colName: name,
col: col,
ctx: ctx,
cancelFn: cancelFn,
}

matchDeleteStage := mongo.Pipeline{
bson.D{{
Key: "$match",
Value: bson.D{{
Key: "operationType",
Value: "delete",
}},
}},
}

// watch only for delete notification of lock owner
err := ownerTable.col.Watch(ctx, matchDeleteStage, table.handleOwnerRelease)
if err != nil {
cancelFn()
return nil, err
}

err = table.col.SetKeyType(reflect.TypeOf(&providerKey{}))
if err != nil {
cancelFn()
return nil, err
}

// register to watch for locks, this is relevant for external
// notification and cleanup as part of handling of release of owners
err = table.col.Watch(ctx, nil, table.Callback)
if err != nil {
cancelFn()
return nil, err
}

go func() {
list := []listKeyEntry{}

err = table.col.FindMany(ctx, nil, &list)
if err != nil {
return
}

for _, entry := range list {
table.Callback("insert", entry.Key)
}
}()

return table, nil
}
74 changes: 74 additions & 0 deletions sync/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

package sync

import (
"context"
"testing"
"time"

"github.com/Prabhjot-Sethi/core/db"
"github.com/Prabhjot-Sethi/core/errors"
)

type myProviderKey struct {
Scope string
Name string
}

func Test_ProviderBaseTesting(t *testing.T) {
config := &db.MongoConfig{
Host: "localhost",
Port: "27017",
Username: "root",
Password: "password",
}

client, err := db.NewMongoClient(config)

if err != nil {
t.Errorf("failed to connect to mongo DB Error: %s", err)
return
}

s := client.GetDataStore("test-sync")

err = InitializeOwner(context.Background(), s, "test-owner")
if err != nil && !errors.IsAlreadyExists(err) {
t.Errorf("Got error while initializing sync owner %s", err)
}

tbl, err := LocateProviderTable(s)

if err != nil {
t.Errorf("failed to locate provider Table: %s", err)
}

key1 := &myProviderKey{
Scope: "scope-1",
Name: "test-key",
}

provider, err := tbl.CreateProvider(context.Background(), key1)
if err != nil {
t.Errorf("failed to create Provider: %s", err)
}
_, err = tbl.CreateProvider(context.Background(), key1)
if err != nil {
t.Errorf("failed to create Provider: %s", err)
}
key2 := &myProviderKey{
Scope: "scope-2",
Name: "test-key",
}

provider1, err := tbl.CreateProvider(context.Background(), key2)
if err != nil {
t.Errorf("failed to acquire lock: %s", err)
}
_ = provider1.Close()
_ = provider.Close()

time.Sleep(2 * time.Second)
}
Loading