Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/Prabhjot-Sethi/core/db"
"github.com/Prabhjot-Sethi/core/errors"
)

Expand Down Expand Up @@ -60,11 +59,10 @@ type ManagerImpl struct {
parent Manager
controllers sync.Map
ctx context.Context
col db.StoreCollection
}

// callback registered with the data store
func (m *ManagerImpl) entryCallback(op string, wKey any) {
func (m *ManagerImpl) NotifyCallback(wKey any) {
// iterate over all the registered clients
m.controllers.Range(func(name, data any) bool {
crtl, ok := data.(*controllerData)
Expand All @@ -82,26 +80,20 @@ func (m *ManagerImpl) entryCallback(op string, wKey any) {
}

// Initialize the manager with context and relevant collection to work with
func (m *ManagerImpl) Initialize(ctx context.Context, col db.StoreCollection, parent Manager) error {
if m.col != nil {
func (m *ManagerImpl) Initialize(ctx context.Context, parent Manager) error {
if m.parent != nil {
return errors.Wrap(errors.AlreadyExists, "Initialization already done")
}

err := col.Watch(ctx, nil, m.entryCallback)
if err != nil {
return err
}

m.ctx = ctx
m.col = col
m.parent = parent

return nil
}

// register a controller with manager for reconciliation
func (m *ManagerImpl) Register(name string, crtl Controller) error {
if m.col == nil {
if m.parent == nil {
return errors.Wrap(errors.InvalidArgument, "manager is not initialized")
}
data := &controllerData{
Expand Down
7 changes: 6 additions & 1 deletion reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type MyTable struct {
col db.StoreCollection
}

func (t *MyTable) Callback(op string, wKey any) {
t.NotifyCallback(wKey)
}

func (t *MyTable) ReconcilerGetAllKeys() []any {
myKeys := []MyKeyObject{}
keys := []any{}
Expand Down Expand Up @@ -86,7 +90,8 @@ func performMongoSetup() {
table = &MyTable{}
table.col = col
_ = table.col.SetKeyType(reflect.TypeOf(&MyKey{}))
_ = table.Initialize(context.Background(), col, table)
_ = table.col.Watch(context.Background(), nil, table.Callback)
_ = table.Initialize(context.Background(), table)
}

func tearDownMongoSetup() {
Expand Down