Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions resource/bitmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

// Initial reference and motivation taken from
// https://github.com/cilium/ipam/blob/master/service/allocator/bitmap.go
// However, we will require to use this while being backed by data store
// thus avoiding usage of big int and replacing it with multi-dimensional
// metrix of int64

package resource

type Bitmap struct {
//Bits []uint64 `bson:bits,omitempty`
}
76 changes: 76 additions & 0 deletions sync/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

package sync

import (
"sync"

"github.com/Prabhjot-Sethi/core/reconciler"
)

type observerCountKey struct {
ExtKey any `bson:"_id.extKey,omitempty"`
}

type observerTable struct {
reconciler.ManagerImpl
mu sync.RWMutex
providers map[any]struct{}
}

func (o *observerTable) getProviderList() []any {
list := []any{}
func() {
o.mu.Lock()
defer o.mu.Unlock()
for k := range o.providers {
list = append(list, k)
}
}()
return list
}

func (o *observerTable) isProviderAvailable(key any) bool {
o.mu.Lock()
defer o.mu.Unlock()
_, ok := o.providers[key]
return ok
}

func (o *observerTable) deleteProvider(key any) {
var ok bool
func() {
o.mu.Lock()
defer o.mu.Unlock()
_, ok = o.providers[key]
if ok {
delete(o.providers, key)
}
}()
if ok {
// since the observer is removed trigger an update
// for controllers
o.NotifyCallback(key)
}
}

func (o *observerTable) insertProvider(key any) {
var ok bool
func() {
o.mu.Lock()
defer o.mu.Unlock()
_, ok = o.providers[key]
if !ok {
o.providers[key] = struct{}{}
}
}()
if !ok {
// Notify an insert of new provider to providers
o.NotifyCallback(key)
}
}

func (o *observerTable) ReconcilerGetAllKeys() []any {
return o.getProviderList()
}
103 changes: 103 additions & 0 deletions sync/observer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

package sync

import (
"context"
"log"
"testing"
"time"

"github.com/Prabhjot-Sethi/core/db"
"github.com/Prabhjot-Sethi/core/errors"
"github.com/Prabhjot-Sethi/core/reconciler"
)

type MyObserver struct {
reconciler.Controller
providers map[string]struct{}
tbl *ProviderTable
}

func (o *MyObserver) Reconcile(k any) (*reconciler.Result, error) {
key := k.(string)
if key == "" {
log.Panicln("Got invalid key response")
}
if o.tbl.IsProviderAvailable(key) {
o.providers[key] = struct{}{}
} else {
delete(o.providers, key)
}
return &reconciler.Result{}, nil
}

func Test_ObserverBaseTesting(t *testing.T) {
config := &db.MongoConfig{
Host: "localhost",
Port: "27017",
Username: "root",
Password: "password",
}

client, err := db.NewMongoClient(config)

if err != nil {
t.Errorf("failed to connect to mongo DB Error: %s", err)
return
}

s := client.GetDataStore("test-sync")

err = InitializeOwner(context.Background(), s, "test-owner")
if err != nil && !errors.IsAlreadyExists(err) {
t.Errorf("Got error while initializing sync owner %s", err)
}

tbl, err := LocateProviderTable(s)
if err != nil {
t.Errorf("failed to locate provider Table: %s", err)
}

obs := &MyObserver{
providers: map[string]struct{}{},
tbl: tbl,
}
tbl.Register("test-observer", obs)

provider, err := tbl.CreateProvider(context.Background(), "test-key")
if err != nil {
t.Errorf("failed to create Provider: %s", err)
}
time.Sleep(1 * time.Second)
if len(obs.providers) != 1 {
t.Errorf("Expected 1 provider but got %d", len(obs.providers))
}

providerDup, err := tbl.CreateProvider(context.Background(), "test-key")
if err != nil {
t.Errorf("failed to create Provider: %s", err)
}
time.Sleep(1 * time.Second)
if len(obs.providers) != 1 {
t.Errorf("Expected 1 provider but got %d", len(obs.providers))
}

provider1, err := tbl.CreateProvider(context.Background(), "test-key-1")
if err != nil {
t.Errorf("failed to create provider: %s", err)
}
time.Sleep(1 * time.Second)
if len(obs.providers) != 2 {
t.Errorf("Expected 2 provider but got %d", len(obs.providers))
}
_ = provider1.Close()
_ = providerDup.Close()
_ = provider.Close()

time.Sleep(2 * time.Second)
if len(obs.providers) != 0 {
t.Errorf("Expected 0 provider but got %d", len(obs.providers))
}
}
48 changes: 46 additions & 2 deletions sync/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/Prabhjot-Sethi/core/db"
"github.com/Prabhjot-Sethi/core/errors"
"github.com/Prabhjot-Sethi/core/reconciler"
)

const (
Expand Down Expand Up @@ -62,6 +63,9 @@ type ProviderTable struct {

// Context cancel function
cancelFn context.CancelFunc

// observer table
oTbl *observerTable
}

// Provider Table callback function, currently meant for
Expand All @@ -71,8 +75,23 @@ type ProviderTable struct {
func (t *ProviderTable) Callback(op string, wKey interface{}) {
key := wKey.(*providerKey)

// ensure updating the observer table based on availability
// or unavailability of provider
obKey := &observerCountKey{
ExtKey: key.ExtKey,
}
cnt, err := t.col.Count(context.Background(), obKey)
if err != nil {
log.Panicf("failed to fetch count of providers: %s", err)
}
if cnt == 0 {
t.oTbl.deleteProvider(obKey.ExtKey)
} else {
t.oTbl.insertProvider(obKey.ExtKey)
}

entry := &providerData{}
err := t.col.FindOne(context.Background(), key, entry)
err = t.col.FindOne(context.Background(), key, entry)
if err != nil {
if errors.IsNotFound(err) {
return
Expand Down Expand Up @@ -119,6 +138,22 @@ func (t *ProviderTable) handleOwnerRelease(op string, wKey any) {
}
}

// Allow a reconciler controller to register and get notified for availability
// and unavailability of providers
func (t *ProviderTable) Register(name string, crtl reconciler.Controller) error {
return t.oTbl.Register(name, crtl)
}

// Get List of Providers
func (t *ProviderTable) GetProviderList() []any {
return t.oTbl.getProviderList()
}

// Checks if provider exists
func (t *ProviderTable) IsProviderAvailable(key any) bool {
return t.oTbl.isProviderAvailable(key)
}

// create provider based on the specified key, typically a string,
// Returns Provider handle, allowing to close the provider
func (t *ProviderTable) CreateProvider(ctx context.Context, extKey any) (*Provider, error) {
Expand Down Expand Up @@ -175,6 +210,15 @@ func LocateProviderTableWithName(store db.Store, name string) (*ProviderTable, e
col: col,
ctx: ctx,
cancelFn: cancelFn,
oTbl: &observerTable{
providers: make(map[any]struct{}),
},
}

err := table.oTbl.Initialize(ctx, table.oTbl)
if err != nil {
cancelFn()
return nil, err
}

matchDeleteStage := mongo.Pipeline{
Expand All @@ -188,7 +232,7 @@ func LocateProviderTableWithName(store db.Store, name string) (*ProviderTable, e
}

// watch only for delete notification of lock owner
err := ownerTable.col.Watch(ctx, matchDeleteStage, table.handleOwnerRelease)
err = ownerTable.col.Watch(ctx, matchDeleteStage, table.handleOwnerRelease)
if err != nil {
cancelFn()
return nil, err
Expand Down
23 changes: 5 additions & 18 deletions sync/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ import (
"github.com/Prabhjot-Sethi/core/errors"
)

type myProviderKey struct {
Scope string
Name string
}

func Test_ProviderBaseTesting(t *testing.T) {
config := &db.MongoConfig{
Host: "localhost",
Expand Down Expand Up @@ -45,29 +40,21 @@ func Test_ProviderBaseTesting(t *testing.T) {
t.Errorf("failed to locate provider Table: %s", err)
}

key1 := &myProviderKey{
Scope: "scope-1",
Name: "test-key",
}

provider, err := tbl.CreateProvider(context.Background(), key1)
provider, err := tbl.CreateProvider(context.Background(), "test-key")
if err != nil {
t.Errorf("failed to create Provider: %s", err)
}
_, err = tbl.CreateProvider(context.Background(), key1)
providerDup, err := tbl.CreateProvider(context.Background(), "test-key")
if err != nil {
t.Errorf("failed to create Provider: %s", err)
}
key2 := &myProviderKey{
Scope: "scope-2",
Name: "test-key",
}

provider1, err := tbl.CreateProvider(context.Background(), key2)
provider1, err := tbl.CreateProvider(context.Background(), "test-key-1")
if err != nil {
t.Errorf("failed to acquire lock: %s", err)
t.Errorf("failed to create provider: %s", err)
}
_ = provider1.Close()
_ = providerDup.Close()
_ = provider.Close()

time.Sleep(2 * time.Second)
Expand Down