Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions lock/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ type LockTable struct {

// context in which this lock table is being working on
ctx context.Context

// Context cancel function
cancelFn context.CancelFunc
}

func (t *LockTable) Callback(op string, wKey interface{}) {
// handle callback as and when needed
// we may need notification of release of locks
// allowing others to start working on it
}

func (t *LockTable) handleOwnerRelease(op string, wKey interface{}) {
Expand Down Expand Up @@ -94,14 +103,17 @@ func LocateLockTable(store db.Store, name string) (*LockTable, error) {
if ownerTable == nil {
return nil, errors.Wrap(errors.InvalidArgument, "Mandatory! owner table infra not initialized")
}

ctx, cancelFn := context.WithCancel(ownerTable.ctx)

// no existing table found, allocate a new one
col := store.GetCollection(name)
table = &LockTable{
colName: name,
col: col,
ctx: ownerTable.ctx,
colName: name,
col: col,
ctx: ctx,
cancelFn: cancelFn,
}
lockTables[name] = table

matchDeleteStage := mongo.Pipeline{
bson.D{{
Expand All @@ -113,11 +125,22 @@ func LocateLockTable(store db.Store, name string) (*LockTable, error) {
}},
}

// watch only for delete notification
err := ownerTable.col.Watch(table.ctx, matchDeleteStage, table.handleOwnerRelease)
// watch only for delete notification of lock owner
err := ownerTable.col.Watch(ctx, matchDeleteStage, table.handleOwnerRelease)
if err != nil {
cancelFn()
return nil, err
}

// register to watch for locks, this is relevant for external
// notification and cleanup as part of handling of release of owners
err = table.col.Watch(ctx, nil, table.Callback)
if err != nil {
cancelFn()
return nil, err
}

lockTables[name] = table
}

return table, nil
Expand Down