Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (

require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0
github.com/klauspost/compress v1.18.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
Expand Down
40 changes: 40 additions & 0 deletions lock/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Lock Infrastructure

While working with multi threaded programing, one heavily relies and make use
of mutex, to take a lock/ownership ensuring some sort of synchronization
construct between the different threads or go routines or concurrent tasks.
Which enables taking care of break condition between them ensuring no over
stepping between concurrent executors.

While there is nothing standard that exists providing the necessary
synchronization between different processes, running on same machine or
different. When working with kubernetes applications using microservices based
arhitecture, with horizontal scaling capabilities would be running multiple
instances of the same application, where each instance with same logic would
try to perform same kind of operations based on the events received, which
would required a mutex like construct to function across microservices for
synchronization between two different processes, possibly running on different
nodes of the kubernetes cluster

TODO(Prabhjot) insert a diagram of every process interacting with synchronizer

## Solution

As a base construct Database is one of the entity every process or microservice
instance interacts with, where Database ensures reliability, consistency as
part of its internal processes. While working with databases, it also ensures
for every entry written to it won't allow if a conflicting Key already exits in
the database, which provides the base fundamental construct needed for enabling
mutex functionality across process. Thus Database is a strong candidate that
provides the synchronizer contructs.

TODO(Prabhjot) insert a diagram of every process interacting with database as synchronizer

However, one of the basic implicit construct of mutex is being ephermal, so if
a process restarts the constructs of mutex is lost and everything will start
afresh but this becomes now tricky in case of database based entries as
whenever a process dies while holding a mutex or lock, someone needs to ensure
sanity of the system by identifying that the lock is held by a process which is
no longer active and thus clears it up allowing the continuity of operations

TODO(Prabhjot) add details of the lock owner handling details
159 changes: 159 additions & 0 deletions lock/owner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

package lock

import (
"context"
"log"
"sync"
"time"

"github.com/google/uuid"
"go.mongodb.org/mongo-driver/bson"

"github.com/Prabhjot-Sethi/core/db"
"github.com/Prabhjot-Sethi/core/errors"
)

const (
// collection name for Lock ownership table
lockOwnerShipCollection = "lock-owner-table"

// default periodic interval for updating
// last seen time for owner, in seconds
defaultOwnerUpdateInterval = 10

// default number of iterations missed before
// aging out an entry
defaultOwnerAgeUpdateMissed = 3
)

type ownerKey struct {
Name string `bson:"name,omitempty"`
}

type ownerData struct {
LastSeen int64 `bson:"lastSeen,omitempty"`
}

type ownerTableType struct {
ctx context.Context
store db.Store
col db.StoreCollection
name string
key *ownerKey
updateInterval time.Duration
}

func (t *ownerTableType) updateLastSeen() {
data := &ownerData{
LastSeen: time.Now().Unix(),
}
err := t.col.UpdateOne(context.Background(), t.key, data, false)
if err != nil {
log.Panicf("failed to update ownership table: %s", err)
}
}

func (t *ownerTableType) deleteAgedOwnerTableEntries() {
// delete multiple entires, those have atleast missed
// threshold count of age to timout an entry
filterTime := time.Now().Add(-1 * defaultOwnerAgeUpdateMissed * t.updateInterval).Unix()

filter := bson.D{
{
Key: "",
Value: bson.D{{Key: "$lt", Value: filterTime}},
},
}
_, err := t.col.DeleteMany(t.ctx, filter)
if err != nil && !errors.IsNotFound(err) {
log.Printf("failed to perform delete of aged lock owner entries")
}
}

func (t *ownerTableType) allocateOwner(name string) error {
id := name
if id == "" {
id = "unknown"
}
data := &ownerData{
LastSeen: time.Now().Unix(),
}
uid := uuid.New()
if t.key == nil {
t.key = &ownerKey{
Name: id + "-" + uid.String(),
}
}
err := t.col.InsertOne(context.Background(), t.key, data)
if err != nil {
return err
}

// start a go routine to keep updating the Last Seen time
// periodically, ensuring that we keep the entry active and
// not letting it age out
go func() {
ticker := time.NewTicker(t.updateInterval)
for {
select {
case <-ticker.C:
// Trigger a delete for all entries those have atleast
// missed default number of updates to the database
// this helps aging out the entry
t.updateLastSeen()
t.deleteAgedOwnerTableEntries()
case <-t.ctx.Done():
// exit the update loop as the context under which
// this was running is already closed
// while exiting also ensure that self ownership is
// released
err = t.col.DeleteOne(context.Background(), t.key)
if err != nil {
log.Printf("failed deleting self owner lock: %s, got error: %s", t.key.Name, err)
}
return
}
}
}()
return nil
}

var (
// singleton object for owner table
ownerTable *ownerTableType

// mutex for safe initialization of owner table
ownerTableInit sync.Mutex
)

// Initialize the Lock Owner management constructs, anyone while working with
// this library requires to use this function before actually start consuming
// any functionality from here.
// Also it is callers responsibility to ensure providing uniform store
// definition for all the consuming processes to ensure synchronisation to work
// in a seemless manner
func InitializeLockOwner(ctx context.Context, store db.Store, name string) error {
ownerTableInit.Lock()
defer ownerTableInit.Unlock()
if ownerTable != nil {
return errors.Wrap(errors.AlreadyExists, "Lock Owner is already initialized")
}

col := store.GetCollection(lockOwnerShipCollection)

ownerTable = &ownerTableType{
ctx: ctx,
store: store,
col: col,
name: name,
updateInterval: time.Duration(defaultOwnerUpdateInterval * time.Second),
}

// allocate owner entry context
ownerTable.allocateOwner(name)

return nil
}