Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion sync/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ var (
// definition for all the consuming processes to ensure synchronisation to work
// in a seemless manner
func InitializeLockOwner(ctx context.Context, store db.Store, name string) error {
return InitializeLockOwnerWithUpdateInterval(ctx, store, name, defaultOwnerUpdateInterval)
}

// Initialize the Lock Owner management constructs, anyone while working with
// this library requires to use this function before actually start consuming
// any functionality from here.
// This also allows specifying the interval to ensuring configurability
// Also it is callers responsibility to ensure providing uniform store
// definition for all the consuming processes to ensure synchronisation to work
// in a seemless manner
func InitializeLockOwnerWithUpdateInterval(ctx context.Context, store db.Store, name string, interval time.Duration) error {
ownerTableInit.Lock()
defer ownerTableInit.Unlock()
if ownerTable != nil {
Expand All @@ -180,7 +191,7 @@ func InitializeLockOwner(ctx context.Context, store db.Store, name string) error
store: store,
col: col,
name: name,
updateInterval: time.Duration(defaultOwnerUpdateInterval * time.Second),
updateInterval: time.Duration(interval * time.Second),
}

// allocate owner entry context
Expand Down
45 changes: 45 additions & 0 deletions sync/test/example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package main

import (
"context"
"log"
"time"

"github.com/Prabhjot-Sethi/core/db"
"github.com/Prabhjot-Sethi/core/errors"
"github.com/Prabhjot-Sethi/core/sync"
)

func main() {
ctx, cancelfn := context.WithCancel(context.Background())
defer time.Sleep(2 * time.Second)
defer cancelfn()
config := &db.MongoConfig{
Host: "localhost",
Port: "27017",
Username: "root",
Password: "password",
}

client, err := db.NewMongoClient(config)

if err != nil {
log.Panicf("failed to connect to mongo DB Error: %s", err)
return
}

err = client.HealthCheck(context.Background())
if err != nil {
log.Panicf("failed to perform Health check with DB Error: %s", err)
}

s := client.GetDataStore("test-sync")
err = sync.InitializeLockOwnerWithUpdateInterval(ctx, s, "test-owner", 1)
if err != nil && !errors.IsAlreadyExists(err) {
log.Panicf("Got error while initializing lock owner %s", err)
}
for {
// loop endlessly to run aging process for owner table
time.Sleep(5 * time.Second)
}
}