From c2c70ec623cbad5f542170c744dcd2c9b5b0c9a8 Mon Sep 17 00:00:00 2001 From: Prabhjot Singh Sethi Date: Tue, 15 Apr 2025 16:22:34 +0000 Subject: [PATCH] Initial structure to enable owner infra for locks These locks will be enabled to work across different processes or microservices, which using database as the common point for synchronization Signed-off-by: Prabhjot Singh Sethi --- go.mod | 1 + go.sum | 2 + lock/README.md | 40 +++++++++++++ lock/owner.go | 159 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 202 insertions(+) create mode 100644 lock/README.md create mode 100644 lock/owner.go diff --git a/go.mod b/go.mod index f48f299..251d46d 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( require ( github.com/golang/snappy v0.0.4 // indirect + github.com/google/uuid v1.6.0 github.com/klauspost/compress v1.18.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect diff --git a/go.sum b/go.sum index 5430bfc..9890507 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= diff --git a/lock/README.md b/lock/README.md new file mode 100644 index 0000000..946b31e --- /dev/null +++ b/lock/README.md @@ -0,0 +1,40 @@ +# Lock Infrastructure + +While working with multi threaded programing, one heavily relies and make use +of mutex, to take a lock/ownership ensuring some sort of synchronization +construct between the different threads or go routines or concurrent tasks. +Which enables taking care of break condition between them ensuring no over +stepping between concurrent executors. + +While there is nothing standard that exists providing the necessary +synchronization between different processes, running on same machine or +different. When working with kubernetes applications using microservices based +arhitecture, with horizontal scaling capabilities would be running multiple +instances of the same application, where each instance with same logic would +try to perform same kind of operations based on the events received, which +would required a mutex like construct to function across microservices for +synchronization between two different processes, possibly running on different +nodes of the kubernetes cluster + +TODO(Prabhjot) insert a diagram of every process interacting with synchronizer + +## Solution + +As a base construct Database is one of the entity every process or microservice +instance interacts with, where Database ensures reliability, consistency as +part of its internal processes. While working with databases, it also ensures +for every entry written to it won't allow if a conflicting Key already exits in +the database, which provides the base fundamental construct needed for enabling +mutex functionality across process. Thus Database is a strong candidate that +provides the synchronizer contructs. + +TODO(Prabhjot) insert a diagram of every process interacting with database as synchronizer + +However, one of the basic implicit construct of mutex is being ephermal, so if +a process restarts the constructs of mutex is lost and everything will start +afresh but this becomes now tricky in case of database based entries as +whenever a process dies while holding a mutex or lock, someone needs to ensure +sanity of the system by identifying that the lock is held by a process which is +no longer active and thus clears it up allowing the continuity of operations + +TODO(Prabhjot) add details of the lock owner handling details \ No newline at end of file diff --git a/lock/owner.go b/lock/owner.go new file mode 100644 index 0000000..785bbcf --- /dev/null +++ b/lock/owner.go @@ -0,0 +1,159 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package lock + +import ( + "context" + "log" + "sync" + "time" + + "github.com/google/uuid" + "go.mongodb.org/mongo-driver/bson" + + "github.com/Prabhjot-Sethi/core/db" + "github.com/Prabhjot-Sethi/core/errors" +) + +const ( + // collection name for Lock ownership table + lockOwnerShipCollection = "lock-owner-table" + + // default periodic interval for updating + // last seen time for owner, in seconds + defaultOwnerUpdateInterval = 10 + + // default number of iterations missed before + // aging out an entry + defaultOwnerAgeUpdateMissed = 3 +) + +type ownerKey struct { + Name string `bson:"name,omitempty"` +} + +type ownerData struct { + LastSeen int64 `bson:"lastSeen,omitempty"` +} + +type ownerTableType struct { + ctx context.Context + store db.Store + col db.StoreCollection + name string + key *ownerKey + updateInterval time.Duration +} + +func (t *ownerTableType) updateLastSeen() { + data := &ownerData{ + LastSeen: time.Now().Unix(), + } + err := t.col.UpdateOne(context.Background(), t.key, data, false) + if err != nil { + log.Panicf("failed to update ownership table: %s", err) + } +} + +func (t *ownerTableType) deleteAgedOwnerTableEntries() { + // delete multiple entires, those have atleast missed + // threshold count of age to timout an entry + filterTime := time.Now().Add(-1 * defaultOwnerAgeUpdateMissed * t.updateInterval).Unix() + + filter := bson.D{ + { + Key: "", + Value: bson.D{{Key: "$lt", Value: filterTime}}, + }, + } + _, err := t.col.DeleteMany(t.ctx, filter) + if err != nil && !errors.IsNotFound(err) { + log.Printf("failed to perform delete of aged lock owner entries") + } +} + +func (t *ownerTableType) allocateOwner(name string) error { + id := name + if id == "" { + id = "unknown" + } + data := &ownerData{ + LastSeen: time.Now().Unix(), + } + uid := uuid.New() + if t.key == nil { + t.key = &ownerKey{ + Name: id + "-" + uid.String(), + } + } + err := t.col.InsertOne(context.Background(), t.key, data) + if err != nil { + return err + } + + // start a go routine to keep updating the Last Seen time + // periodically, ensuring that we keep the entry active and + // not letting it age out + go func() { + ticker := time.NewTicker(t.updateInterval) + for { + select { + case <-ticker.C: + // Trigger a delete for all entries those have atleast + // missed default number of updates to the database + // this helps aging out the entry + t.updateLastSeen() + t.deleteAgedOwnerTableEntries() + case <-t.ctx.Done(): + // exit the update loop as the context under which + // this was running is already closed + // while exiting also ensure that self ownership is + // released + err = t.col.DeleteOne(context.Background(), t.key) + if err != nil { + log.Printf("failed deleting self owner lock: %s, got error: %s", t.key.Name, err) + } + return + } + } + }() + return nil +} + +var ( + // singleton object for owner table + ownerTable *ownerTableType + + // mutex for safe initialization of owner table + ownerTableInit sync.Mutex +) + +// Initialize the Lock Owner management constructs, anyone while working with +// this library requires to use this function before actually start consuming +// any functionality from here. +// Also it is callers responsibility to ensure providing uniform store +// definition for all the consuming processes to ensure synchronisation to work +// in a seemless manner +func InitializeLockOwner(ctx context.Context, store db.Store, name string) error { + ownerTableInit.Lock() + defer ownerTableInit.Unlock() + if ownerTable != nil { + return errors.Wrap(errors.AlreadyExists, "Lock Owner is already initialized") + } + + col := store.GetCollection(lockOwnerShipCollection) + + ownerTable = &ownerTableType{ + ctx: ctx, + store: store, + col: col, + name: name, + updateInterval: time.Duration(defaultOwnerUpdateInterval * time.Second), + } + + // allocate owner entry context + ownerTable.allocateOwner(name) + + return nil +}