Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions reconciler/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

package reconciler

type ReconcilerFunc func(k any) (*Result, error)
110 changes: 110 additions & 0 deletions reconciler/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

package reconciler

import (
"context"
"sync"
"time"
)

// Since Reconciler Pipeline will be used across go routines, it is
// quite possible to have producers and consumers to work at
// different speeds with a possibility of having backlogs or causing
// holdups, thus by default use a buffer length of 1024 for every
// Pipeline to ensure producers can just work seemlessly under
// regular scenarios
// Note: this is expected to be consumed only locally
const bufferLength = 1024

// Pipeline of elements to be processed by reconciler upon notification
type Pipeline struct {
// context under which the pipeline is working
// where the context closure means the pipeline is stopped
ctx context.Context

// map of entries to work with, here we are storing entries in a map
// to enable possibility of compressing notifications while trying
// to enqueue an entry which is already in pipeline
pMap sync.Map

// Pipeline is internally built on a buffered channel internally
pChannel chan any

// reconciler function to trigger while processing an entry in the
// pipeline
reconciler reconcilerFunc
}

func (p *Pipeline) Enqueue(k any) error {
// do not allow if the context is already closed
if p.ctx.Err() != nil {
return p.ctx.Err()
}

// load or store the entry to sync map, checking existence of the
// entry in the Pipeline, ensuring compressing multiple
// notifications for a single entry into one
// while the value stored is nil as we are treating this map more
// of as a set, where values do not hold relevance as of now
_, loaded := p.pMap.LoadOrStore(k, nil)
if !loaded {
// if entry didn't exist in the map, ensure pushing the same
// to the buffered channel for processing by reconciler
p.pChannel <- k
}

return nil
}

// initialize and start the pipeline processing
// internal function and should not be exposed outside
func (p *Pipeline) initialize() {
for {
select {
case <-p.ctx.Done():
// pipeline processing is stopped return from here
return
case k := <-p.pChannel:
// process the entry available in the pipeline
// send it over to the reconciler for processing
// delete the key from the map while triggering
// the reconciler
p.pMap.Delete(k)

// trigger the reconciler
res, err := p.reconciler(k)
if err != nil {
// there was an error while processing the entry
// requeue it at the back of the pipeline for
// processing later
_ = p.Enqueue(k)
} else {
if res != nil && res.RequeueAfter != 0 {
go func(k1 any) {
// requeue the entry after specified time
time.Sleep(res.RequeueAfter)
_ = p.Enqueue(k1)
}(k)
}
}
}
}
}

// Creates a New Pipeline for queuing up and processing entries provided
// for reconciliation
func NewPipeline(ctx context.Context, fn reconcilerFunc) *Pipeline {
p := &Pipeline{
ctx: ctx,
pMap: sync.Map{},
pChannel: make(chan any, bufferLength),
reconciler: fn,
}

// initialize the pipeline before passing it externally
// to start the core functionality
go p.initialize()
return p
}
132 changes: 132 additions & 0 deletions reconciler/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

package reconciler

import (
"context"
"log"
"sync"
"time"

"github.com/Prabhjot-Sethi/core/db"
"github.com/Prabhjot-Sethi/core/errors"
)

// Taking motivation from kubernetes
// https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/reconcile/reconcile.go
// enable a reconciler function
type Result struct {
// RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
RequeueAfter time.Duration
}

type Request struct {
Key any
}

type reconcilerFunc func(k any) (*Result, error)

// controller interface meant for registering to database manager
// for processing changes inoccuring to varies entries in the database
type Controller interface {
Reconcile(k any) (*Result, error)
}

// Controller data used for saving the context of a controller
// and corresponding information along with the reconciliation
// pipeline
type controllerData struct {
name string
handle Controller
pipeline *Pipeline
}

// Manager interface for enforcing implementation of specific
// functions
type Manager interface {
// function to get all existing keys in the collection
ReconcilerGetAllKeys() []any

// interface should not be embed by anyone directly
mustEmbedManagerImpl()
}

// Manager implementation with implementation of the core logic
// typically built over and above database store on which it will
// offer reconcilation capabilities
type ManagerImpl struct {
Manager
parent Manager
controllers sync.Map
ctx context.Context
col db.StoreCollection
}

// callback registered with the data store
func (m *ManagerImpl) entryCallback(op string, wKey any) {
// iterate over all the registered clients
m.controllers.Range(func(name, data any) bool {
crtl, ok := data.(*controllerData)
if !ok {
// this ideally should never happen
log.Panicln("Wrong data type of controller info received")
}
// enqueue the entry for reconciliation
err := crtl.pipeline.Enqueue(wKey)
if err != nil {
log.Panicln("Failed to enqueue an entry for reconciliation", name, err)
}
return true
})
}

// Initialize the manager with context and relevant collection to work with
func (m *ManagerImpl) Initialize(ctx context.Context, col db.StoreCollection, parent Manager) error {
if m.col != nil {
return errors.Wrap(errors.AlreadyExists, "Initialization already done")
}

err := col.Watch(ctx, nil, m.entryCallback)
if err != nil {
return err
}

m.ctx = ctx
m.col = col
m.parent = parent

return nil
}

// register a controller with manager for reconciliation
func (m *ManagerImpl) Register(name string, crtl Controller) error {
if m.col == nil {
return errors.Wrap(errors.InvalidArgument, "manager is not initialized")
}
data := &controllerData{
name: name,
handle: crtl,
}
_, loaded := m.controllers.LoadOrStore(name, data)
if loaded {
return errors.Wrapf(errors.AlreadyExists, "Reconclier %s, already exists", name)
}

// initiate a new pipeline for reconcilation triggers
data.pipeline = NewPipeline(m.ctx, crtl.Reconcile)

// ensure triggering reconciliation of existing entries
// separately for reconciliation by the controller
go func() {
keys := m.parent.ReconcilerGetAllKeys()
for _, key := range keys {
err := data.pipeline.Enqueue(key)
if err != nil {
log.Panicln("failed to enqueue an entry from existing in the queue", err)
}
}
}()

return nil
}
145 changes: 145 additions & 0 deletions reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved
// Author: Prabhjot Singh Sethi <prabhjot.sethi@gmail.com>

package reconciler

import (
"context"
"log"
"reflect"
"testing"
"time"

"github.com/Prabhjot-Sethi/core/db"
"go.mongodb.org/mongo-driver/bson"
)

type MyKey struct {
Name string
}

type MyData struct {
Desc string
}

type MyKeyObject struct {
Key *MyKey `bson:"_id,omitempty"`
Desc string
}

type MyTable struct {
ManagerImpl
col db.StoreCollection
}

func (t *MyTable) ReconcilerGetAllKeys() []any {
myKeys := []MyKeyObject{}
keys := []any{}
t.col.FindMany(context.Background(), nil, &myKeys)
for _, k := range myKeys {
keys = append(keys, k.Key)
}
return []any(keys)
}

var table *MyTable

func performMongoSetup() {
config := &db.MongoConfig{
Host: "localhost",
Port: "27017",
Username: "root",
Password: "password",
}

client, err := db.NewMongoClient(config)

if err != nil {
log.Printf("failed to connect to mongo DB Error: %s", err)
return
}

s := client.GetDataStore("test")
col := s.GetCollection("collection-reconciler")

col.DeleteMany(context.Background(), bson.D{})

key := &MyKey{
Name: "test-key-1",
}
data := &MyData{
Desc: "sample-description",
}

err = col.InsertOne(context.Background(), key, data)
if err != nil {
log.Printf("failed to insert an entry to collection Error: %s", err)
}

table = &MyTable{}
table.col = col
table.col.SetKeyType(reflect.TypeOf(&MyKey{}))
table.Initialize(context.Background(), col, table)
}

func tearDownMongoSetup() {
table.col.DeleteMany(context.Background(), bson.D{})
}

type MyController struct {
Controller
reEnqueue bool
notifications int
}

func (c *MyController) Reconcile(k any) (*Result, error) {
key := k.(*MyKey)
if key.Name == "" {
log.Panicln("Got invalid key response")
}
c.notifications += 1
if c.reEnqueue {
c.reEnqueue = false
return &Result{
RequeueAfter: 1 * time.Second,
}, nil
}
return &Result{}, nil
}

func Test_ReconcilerBaseValidations(t *testing.T) {
performMongoSetup()

crtl := &MyController{}

err := table.Register("test", crtl)
if err != nil {
t.Errorf("Got Error %s, while registering controller", err)
}

time.Sleep(1 * time.Second)
if crtl.notifications != 1 {
t.Errorf("Got %d notifications, expected only 1", crtl.notifications)
}

crtl.reEnqueue = true

key := &MyKey{
Name: "test-key-2",
}
data := &MyData{
Desc: "sample-description",
}

err = table.col.InsertOne(context.Background(), key, data)
if err != nil {
log.Printf("failed to insert an entry to collection Error: %s", err)
}

time.Sleep(3 * time.Second)
if crtl.notifications != 3 {
t.Errorf("Got %d notifications, expected 3", crtl.notifications)
}

tearDownMongoSetup()
}