From 40ce97b65b58e9ffc2622028e31268a1377e6fef Mon Sep 17 00:00:00 2001 From: Prabhjot Singh Sethi Date: Thu, 17 Apr 2025 11:06:56 +0000 Subject: [PATCH] Add reconciler logic for core infra Allowing database manager to have capability of allowing reconcilers using notification mechanisms Signed-off-by: Prabhjot Singh Sethi --- reconciler/notifier.go | 6 ++ reconciler/pipeline.go | 110 ++++++++++++++++++++++++++ reconciler/reconciler.go | 132 +++++++++++++++++++++++++++++++ reconciler/reconciler_test.go | 145 ++++++++++++++++++++++++++++++++++ 4 files changed, 393 insertions(+) create mode 100644 reconciler/notifier.go create mode 100644 reconciler/pipeline.go create mode 100644 reconciler/reconciler.go create mode 100644 reconciler/reconciler_test.go diff --git a/reconciler/notifier.go b/reconciler/notifier.go new file mode 100644 index 0000000..86e38fe --- /dev/null +++ b/reconciler/notifier.go @@ -0,0 +1,6 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package reconciler + +type ReconcilerFunc func(k any) (*Result, error) diff --git a/reconciler/pipeline.go b/reconciler/pipeline.go new file mode 100644 index 0000000..8e16703 --- /dev/null +++ b/reconciler/pipeline.go @@ -0,0 +1,110 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package reconciler + +import ( + "context" + "sync" + "time" +) + +// Since Reconciler Pipeline will be used across go routines, it is +// quite possible to have producers and consumers to work at +// different speeds with a possibility of having backlogs or causing +// holdups, thus by default use a buffer length of 1024 for every +// Pipeline to ensure producers can just work seemlessly under +// regular scenarios +// Note: this is expected to be consumed only locally +const bufferLength = 1024 + +// Pipeline of elements to be processed by reconciler upon notification +type Pipeline struct { + // context under which the pipeline is working + // where the context closure means the pipeline is stopped + ctx context.Context + + // map of entries to work with, here we are storing entries in a map + // to enable possibility of compressing notifications while trying + // to enqueue an entry which is already in pipeline + pMap sync.Map + + // Pipeline is internally built on a buffered channel internally + pChannel chan any + + // reconciler function to trigger while processing an entry in the + // pipeline + reconciler reconcilerFunc +} + +func (p *Pipeline) Enqueue(k any) error { + // do not allow if the context is already closed + if p.ctx.Err() != nil { + return p.ctx.Err() + } + + // load or store the entry to sync map, checking existence of the + // entry in the Pipeline, ensuring compressing multiple + // notifications for a single entry into one + // while the value stored is nil as we are treating this map more + // of as a set, where values do not hold relevance as of now + _, loaded := p.pMap.LoadOrStore(k, nil) + if !loaded { + // if entry didn't exist in the map, ensure pushing the same + // to the buffered channel for processing by reconciler + p.pChannel <- k + } + + return nil +} + +// initialize and start the pipeline processing +// internal function and should not be exposed outside +func (p *Pipeline) initialize() { + for { + select { + case <-p.ctx.Done(): + // pipeline processing is stopped return from here + return + case k := <-p.pChannel: + // process the entry available in the pipeline + // send it over to the reconciler for processing + // delete the key from the map while triggering + // the reconciler + p.pMap.Delete(k) + + // trigger the reconciler + res, err := p.reconciler(k) + if err != nil { + // there was an error while processing the entry + // requeue it at the back of the pipeline for + // processing later + _ = p.Enqueue(k) + } else { + if res != nil && res.RequeueAfter != 0 { + go func(k1 any) { + // requeue the entry after specified time + time.Sleep(res.RequeueAfter) + _ = p.Enqueue(k1) + }(k) + } + } + } + } +} + +// Creates a New Pipeline for queuing up and processing entries provided +// for reconciliation +func NewPipeline(ctx context.Context, fn reconcilerFunc) *Pipeline { + p := &Pipeline{ + ctx: ctx, + pMap: sync.Map{}, + pChannel: make(chan any, bufferLength), + reconciler: fn, + } + + // initialize the pipeline before passing it externally + // to start the core functionality + go p.initialize() + return p +} diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go new file mode 100644 index 0000000..988df2c --- /dev/null +++ b/reconciler/reconciler.go @@ -0,0 +1,132 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package reconciler + +import ( + "context" + "log" + "sync" + "time" + + "github.com/Prabhjot-Sethi/core/db" + "github.com/Prabhjot-Sethi/core/errors" +) + +// Taking motivation from kubernetes +// https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/reconcile/reconcile.go +// enable a reconciler function +type Result struct { + // RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration. + RequeueAfter time.Duration +} + +type Request struct { + Key any +} + +type reconcilerFunc func(k any) (*Result, error) + +// controller interface meant for registering to database manager +// for processing changes inoccuring to varies entries in the database +type Controller interface { + Reconcile(k any) (*Result, error) +} + +// Controller data used for saving the context of a controller +// and corresponding information along with the reconciliation +// pipeline +type controllerData struct { + name string + handle Controller + pipeline *Pipeline +} + +// Manager interface for enforcing implementation of specific +// functions +type Manager interface { + // function to get all existing keys in the collection + ReconcilerGetAllKeys() []any + + // interface should not be embed by anyone directly + mustEmbedManagerImpl() +} + +// Manager implementation with implementation of the core logic +// typically built over and above database store on which it will +// offer reconcilation capabilities +type ManagerImpl struct { + Manager + parent Manager + controllers sync.Map + ctx context.Context + col db.StoreCollection +} + +// callback registered with the data store +func (m *ManagerImpl) entryCallback(op string, wKey any) { + // iterate over all the registered clients + m.controllers.Range(func(name, data any) bool { + crtl, ok := data.(*controllerData) + if !ok { + // this ideally should never happen + log.Panicln("Wrong data type of controller info received") + } + // enqueue the entry for reconciliation + err := crtl.pipeline.Enqueue(wKey) + if err != nil { + log.Panicln("Failed to enqueue an entry for reconciliation", name, err) + } + return true + }) +} + +// Initialize the manager with context and relevant collection to work with +func (m *ManagerImpl) Initialize(ctx context.Context, col db.StoreCollection, parent Manager) error { + if m.col != nil { + return errors.Wrap(errors.AlreadyExists, "Initialization already done") + } + + err := col.Watch(ctx, nil, m.entryCallback) + if err != nil { + return err + } + + m.ctx = ctx + m.col = col + m.parent = parent + + return nil +} + +// register a controller with manager for reconciliation +func (m *ManagerImpl) Register(name string, crtl Controller) error { + if m.col == nil { + return errors.Wrap(errors.InvalidArgument, "manager is not initialized") + } + data := &controllerData{ + name: name, + handle: crtl, + } + _, loaded := m.controllers.LoadOrStore(name, data) + if loaded { + return errors.Wrapf(errors.AlreadyExists, "Reconclier %s, already exists", name) + } + + // initiate a new pipeline for reconcilation triggers + data.pipeline = NewPipeline(m.ctx, crtl.Reconcile) + + // ensure triggering reconciliation of existing entries + // separately for reconciliation by the controller + go func() { + keys := m.parent.ReconcilerGetAllKeys() + for _, key := range keys { + err := data.pipeline.Enqueue(key) + if err != nil { + log.Panicln("failed to enqueue an entry from existing in the queue", err) + } + } + }() + + return nil +} diff --git a/reconciler/reconciler_test.go b/reconciler/reconciler_test.go new file mode 100644 index 0000000..68e7584 --- /dev/null +++ b/reconciler/reconciler_test.go @@ -0,0 +1,145 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package reconciler + +import ( + "context" + "log" + "reflect" + "testing" + "time" + + "github.com/Prabhjot-Sethi/core/db" + "go.mongodb.org/mongo-driver/bson" +) + +type MyKey struct { + Name string +} + +type MyData struct { + Desc string +} + +type MyKeyObject struct { + Key *MyKey `bson:"_id,omitempty"` + Desc string +} + +type MyTable struct { + ManagerImpl + col db.StoreCollection +} + +func (t *MyTable) ReconcilerGetAllKeys() []any { + myKeys := []MyKeyObject{} + keys := []any{} + t.col.FindMany(context.Background(), nil, &myKeys) + for _, k := range myKeys { + keys = append(keys, k.Key) + } + return []any(keys) +} + +var table *MyTable + +func performMongoSetup() { + config := &db.MongoConfig{ + Host: "localhost", + Port: "27017", + Username: "root", + Password: "password", + } + + client, err := db.NewMongoClient(config) + + if err != nil { + log.Printf("failed to connect to mongo DB Error: %s", err) + return + } + + s := client.GetDataStore("test") + col := s.GetCollection("collection-reconciler") + + col.DeleteMany(context.Background(), bson.D{}) + + key := &MyKey{ + Name: "test-key-1", + } + data := &MyData{ + Desc: "sample-description", + } + + err = col.InsertOne(context.Background(), key, data) + if err != nil { + log.Printf("failed to insert an entry to collection Error: %s", err) + } + + table = &MyTable{} + table.col = col + table.col.SetKeyType(reflect.TypeOf(&MyKey{})) + table.Initialize(context.Background(), col, table) +} + +func tearDownMongoSetup() { + table.col.DeleteMany(context.Background(), bson.D{}) +} + +type MyController struct { + Controller + reEnqueue bool + notifications int +} + +func (c *MyController) Reconcile(k any) (*Result, error) { + key := k.(*MyKey) + if key.Name == "" { + log.Panicln("Got invalid key response") + } + c.notifications += 1 + if c.reEnqueue { + c.reEnqueue = false + return &Result{ + RequeueAfter: 1 * time.Second, + }, nil + } + return &Result{}, nil +} + +func Test_ReconcilerBaseValidations(t *testing.T) { + performMongoSetup() + + crtl := &MyController{} + + err := table.Register("test", crtl) + if err != nil { + t.Errorf("Got Error %s, while registering controller", err) + } + + time.Sleep(1 * time.Second) + if crtl.notifications != 1 { + t.Errorf("Got %d notifications, expected only 1", crtl.notifications) + } + + crtl.reEnqueue = true + + key := &MyKey{ + Name: "test-key-2", + } + data := &MyData{ + Desc: "sample-description", + } + + err = table.col.InsertOne(context.Background(), key, data) + if err != nil { + log.Printf("failed to insert an entry to collection Error: %s", err) + } + + time.Sleep(3 * time.Second) + if crtl.notifications != 3 { + t.Errorf("Got %d notifications, expected 3", crtl.notifications) + } + + tearDownMongoSetup() +}