diff --git a/table/cached_generic.go b/table/cached_generic.go new file mode 100644 index 0000000..cacb5af --- /dev/null +++ b/table/cached_generic.go @@ -0,0 +1,204 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package table + +import ( + "context" + "log" + "reflect" + + "github.com/go-core-stack/core/db" + "github.com/go-core-stack/core/errors" + "github.com/go-core-stack/core/reconciler" +) + +// CachedTable is a generic table type providing common functions and types to specific +// structures each table is built using. This table also ensure keeping an inmemory +// cache information to enable better responsiveness for critical path data fetch, where +// we are required to be consistent, but it is ok to let go some part of accuracy, +// assuming system will automatically converge as it settles down with change propagation. +// It also ensures sanity checks and provides common functionality for database-backed +// tables. +// +// K: Key type (must NOT be a pointer type, typically a struct or primitive) +// E: Entry type (must NOT be a pointer type) +type CachedTable[K comparable, E any] struct { + reconciler.ManagerImpl + cache map[K]*E + col db.StoreCollection +} + +// Initialize sets up the Table with the provided db.StoreCollection. +// It performs sanity checks on the entry and key types and registers the key type with the collection. +// Must be called before any other operation. +// +// Returns an error if the table is already initialized, the entry or key type is a pointer, +// or if the collection setup fails. +func (t *CachedTable[K, E]) Initialize(col db.StoreCollection) error { + if t.col != nil { + return errors.Wrapf(errors.AlreadyExists, "Table is already initialized") + } + + var e E + if reflect.TypeOf(e).Kind() == reflect.Pointer { + return errors.Wrapf(errors.InvalidArgument, "Table entry type must not be a pointer") + } + + var k K + if reflect.TypeOf(k).Kind() == reflect.Pointer { + return errors.Wrapf(errors.InvalidArgument, "Table key type must not be a pointer") + } + + err := col.SetKeyType(reflect.PointerTo(reflect.TypeOf(k))) + if err != nil { + return err + } + + // Register callback for collection changes + err = col.Watch(context.Background(), nil, t.callback) + if err != nil { + return err + } + + // Initialize the reconciler manager + err = t.ManagerImpl.Initialize(context.Background(), t) + if err != nil { + return err + } + + t.col = col + t.cache = map[K]*E{} + return nil +} + +// callback is invoked on collection changes and notifies the reconciler. +func (t *CachedTable[K, E]) callback(op string, wKey any) { + key, ok := wKey.(*K) + // failure should logically never happen, but lets handle just incase + if ok { + entry, err := t.DBFind(context.Background(), key) + if err != nil { + if errors.IsNotFound(err) { + // consider delete scenario + delete(t.cache, *key) + } else { + // this should not happen in regular scenarios + // log and return from here + log.Printf("failed to find an entry, got error: %s", err) + } + } else { + t.cache[*key] = entry + } + } + t.NotifyCallback(wKey) +} + +// ReconcilerGetAllKeys returns all keys in the table. +// Used by the reconciler to enumerate all managed entries. +func (t *CachedTable[K, E]) ReconcilerGetAllKeys() []any { + list := []keyOnly[K]{} + keys := []any{} + err := t.col.FindMany(context.Background(), nil, &list) + if err != nil { + log.Panicf("got error while fetching all keys %s", err) + } + for _, k := range list { + keys = append(keys, &k.Key) + } + return []any(keys) +} + +// Insert adds a new entry to the table with the given key. +// Returns an error if the table is not initialized or the insert fails. +func (t *CachedTable[K, E]) Insert(ctx context.Context, key *K, entry *E) error { + if t.col == nil { + return errors.Wrapf(errors.InvalidArgument, "Table not initialized") + } + return t.col.InsertOne(ctx, key, entry) +} + +// Locate finds an entry by key, inserts it if it doesn't exist, or updates it if it does. +// Returns an error if the table is not initialized or the operation fails. +func (t *CachedTable[K, E]) Locate(ctx context.Context, key *K, entry *E) error { + if t.col == nil { + return errors.Wrapf(errors.InvalidArgument, "Table not initialized") + } + return t.col.UpdateOne(ctx, key, entry, true) +} + +// Update modifies an existing entry with the given key. +// Returns an error if the table is not initialized or the update fails. +func (t *CachedTable[K, E]) Update(ctx context.Context, key *K, entry *E) error { + if t.col == nil { + return errors.Wrapf(errors.InvalidArgument, "Table not initialized") + } + return t.col.UpdateOne(ctx, key, entry, false) +} + +// Find retrieves an entry by key from the Cache +// Returns the entry and error if not found or if the table is not initialized. +func (t *CachedTable[K, E]) Find(ctx context.Context, key *K) (*E, error) { + entry, ok := t.cache[*key] + if !ok { + return nil, errors.Wrapf(errors.NotFound, "failed to find entry with key %v", key) + } + return entry, nil +} + +// DBFind retrieves an entry by key from the Database +// Returns the entry and error if not found or if the table is not initialized. +func (t *CachedTable[K, E]) DBFind(ctx context.Context, key *K) (*E, error) { + var data E + if t.col == nil { + return nil, errors.Wrapf(errors.InvalidArgument, "Table not initialized") + } + err := t.col.FindOne(ctx, key, &data) + if err != nil { + return nil, errors.Wrapf(errors.NotFound, "failed to find entry with key %v: %s", key, err) + } + return &data, err +} + +// DBFindMany retrieves multiple entries matching the provided filter from database. +// Returns a slice of entries and error if none found or if the table is not initialized. +func (t *CachedTable[K, E]) DBFindMany(ctx context.Context, filter any) ([]*E, error) { + if t.col == nil { + return nil, errors.Wrapf(errors.InvalidArgument, "Table not initialized") + } + var data []*E + err := t.col.FindMany(ctx, filter, &data) + if err != nil { + return nil, errors.Wrapf(errors.NotFound, "failed to find any entry: %s", err) + } + + return data, nil +} + +// Count retrieves count of entries matching the provided filter. +// Returns count of entries and error if none found or if the table is not initialized. +func (t *CachedTable[K, E]) Count(ctx context.Context, filter any) (int64, error) { + if t.col == nil { + return 0, errors.Wrapf(errors.InvalidArgument, "Table not initialized") + } + return t.col.Count(ctx, filter) +} + +// DeleteByFilter deletes entries matching the provided filter. +// Returns number of entries deleted and error if any +func (t *CachedTable[K, E]) DeleteByFilter(ctx context.Context, filter any) (int64, error) { + if t.col == nil { + return 0, errors.Wrapf(errors.InvalidArgument, "Table not initialized") + } + return t.col.DeleteMany(ctx, filter) + +} + +// DeleteKey removes an entry by key from the table. +// Returns an error if the table is not initialized or the delete fails. +func (t *CachedTable[K, E]) DeleteKey(ctx context.Context, key *K) error { + if t.col == nil { + return errors.Wrapf(errors.InvalidArgument, "Table not initialized") + } + return t.col.DeleteOne(ctx, key) +} diff --git a/table/cached_generic_test.go b/table/cached_generic_test.go new file mode 100644 index 0000000..78b23ca --- /dev/null +++ b/table/cached_generic_test.go @@ -0,0 +1,233 @@ +// Copyright © 2025 Prabhjot Singh Sethi, All Rights reserved +// Author: Prabhjot Singh Sethi + +package table + +import ( + "context" + "log" + "testing" + "time" + + "github.com/go-core-stack/core/db" + "go.mongodb.org/mongo-driver/v2/bson" +) + +type MyKey struct { + Name string +} + +type InternaData struct { + Test string +} + +type MyData struct { + Desc string + Val *InternaData +} + +type MyTable struct { + CachedTable[MyKey, MyData] +} + +var ( + myTable *MyTable +) + +func clientInit() { + if myTable != nil { + return + } + myTable = &MyTable{} + + config := &db.MongoConfig{ + Host: "localhost", + Port: "27017", + Username: "root", + Password: "password", + } + + client, err := db.NewMongoClient(config) + + if err != nil { + log.Panicf("failed to connect to mongo DB Error: %s", err) + } + + err = client.HealthCheck(context.Background()) + if err != nil { + log.Panicf("failed to perform Health check with DB Error: %s", err) + } + + s := client.GetDataStore("test") + + col := s.GetCollection("my-cached-table") + + err = myTable.Initialize(col) + if err != nil { + log.Panicf("failed to initialize cached table") + } +} + +func Test_CachedClient(t *testing.T) { + clientInit() + t.Run("push_and_find_entries", func(t *testing.T) { + + key := &MyKey{ + Name: "test-key-1", + } + data := &MyData{ + Desc: "sample-description-1", + Val: &InternaData{ + Test: "abc-1", + }, + } + + ctx := context.Background() + + err := myTable.Insert(ctx, key, data) + if err != nil { + t.Errorf("failed inserting entry to the table, got error: %s", err) + } + + // second insert with same key should fail + err = myTable.Insert(ctx, key, data) + if err == nil { + t.Errorf("second insert for same entry to the table succeeded, expeted error") + } + + key2 := &MyKey{ + Name: "test-key-2", + } + data2 := &MyData{ + Desc: "sample-description-2", + Val: &InternaData{ + Test: "abc-2", + }, + } + + err = myTable.Insert(ctx, key2, data2) + if err != nil { + t.Errorf("failed inserting second entry to the table, got error: %s", err) + } + + // add a sleep timer to ensure that the processing of the context is completed + // for the cache table + time.Sleep(1 * time.Second) + + entry, err := myTable.Find(ctx, key) + if err != nil { + t.Errorf("failed to find the inserted entry from the table, got error: %s", err) + } else { + if entry.Desc != "sample-description-1" { + t.Errorf("expected sample-description-1, but got %s", entry.Desc) + } + } + + entry, err = myTable.Find(ctx, key2) + if err != nil { + t.Errorf("failed to find the inserted entry from the table, got error: %s", err) + } else { + if entry.Desc != "sample-description-2" { + t.Errorf("expected sample-description-2, but got %s", entry.Desc) + } + } + + count, err := myTable.col.DeleteMany(ctx, bson.D{}) + if err != nil { + t.Errorf("failed to delete the entries from table, got error %s", err) + } else { + if count != 2 { + t.Errorf("expected delete of two entries from table, but got %d", count) + } + } + }) + + t.Run("find_updated_result", func(t *testing.T) { + + key := &MyKey{ + Name: "test-key-1", + } + data := &MyData{ + Desc: "sample-description-1", + Val: &InternaData{ + Test: "abc-1", + }, + } + + ctx := context.Background() + + err := myTable.Insert(ctx, key, data) + if err != nil { + t.Errorf("failed inserting entry to the table, got error: %s", err) + } + + // second insert with same key should fail + err = myTable.Insert(ctx, key, data) + if err == nil { + t.Errorf("second insert for same entry to the table succeeded, expeted error") + } + + key2 := &MyKey{ + Name: "test-key-2", + } + data2 := &MyData{ + Desc: "sample-description-2", + Val: &InternaData{ + Test: "abc-2", + }, + } + + err = myTable.Insert(ctx, key2, data2) + if err != nil { + t.Errorf("failed inserting second entry to the table, got error: %s", err) + } + + // add a sleep timer to ensure that the processing of the context is completed + // for the cache table + time.Sleep(1 * time.Second) + + entry, err := myTable.Find(ctx, key) + if err != nil { + t.Errorf("failed to find the inserted entry from the table, got error: %s", err) + } else { + if entry.Desc != "sample-description-1" { + t.Errorf("expected sample-description-1, but got %s", entry.Desc) + } + } + + // trigger update + data3 := &MyData{ + Desc: "sample-description-3", + Val: &InternaData{ + Test: "abc-1", + }, + } + + err = myTable.Update(ctx, key, data3) + if err != nil { + t.Errorf("failed to update data into cached table, got error: %s", err) + } + + // add a sleep timer to ensure that the processing of the context is completed + // for the cache table + time.Sleep(1 * time.Second) + + entry, err = myTable.Find(ctx, key) + if err != nil { + t.Errorf("failed to find the inserted entry from the table, got error: %s", err) + } else { + if entry.Desc != "sample-description-3" { + t.Errorf("expected sample-description-3, but got %s", entry.Desc) + } + } + + count, err := myTable.col.DeleteMany(ctx, bson.D{}) + if err != nil { + t.Errorf("failed to delete the entries from table, got error %s", err) + } else { + if count != 2 { + t.Errorf("expected delete of two entries from table, but got %d", count) + } + } + }) +}