From cc2e97f3c2d4dc157238f08b8c0869136c15e6d6 Mon Sep 17 00:00:00 2001 From: Francesco Cosentino Date: Fri, 22 Aug 2025 22:59:36 +0300 Subject: [PATCH] feat(backend,dist-http): add internal HTTP server & HTTP transport with context-aware replication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce internal Fiber-based HTTP server (set/get/remove, health) and HTTP transport for cross-node forwarding. Refactor DistMemory to auto-start server/transport when nodeAddr set, add heartbeat start helper, and split initialization into helper methods (ensureShardConfig, initMembershipIfNeeded, tryStartHTTP, startHeartbeatIfEnabled). Add context propagation through ForwardSet/ForwardRemove, replication, repair, and internal applySet/applyRemove paths; enhance read-repair to use context. Replace direct JSON decoding in get path with map+mirror struct to satisfy lint (musttag) and wrap all network/JSON errors via ewrap. Extend DistTransport interface to include context in write/remove operations. Introduce health probe, structured error wrapping, and best‑effort graceful shutdown in Stop. Switch tests (and cmap test) to goccy/go-json for consistency and performance. Update cspell word list (fctx, hreq, lamport). Add lamport-like version counter note and clarify replication semantics. Key changes: New files: dist_http_server.go, dist_http_transport.go, dist_http_types.go DistMemory: added httpServer field, context-aware Set/Remove/replication, helper init methods DistTransport: ForwardSet/ForwardRemove now accept context InProcessTransport adjusted to new interface Added health endpoint and internal request DTO consolidation Improved error handling & lint compliance (errcheck, wrapcheck, noctx, musttag, etc.) BREAKING CHANGE: DistTransport interface now requires context for ForwardSet and ForwardRemove; internal helpers applySet/applyRemove and replicateTo signatures changed accordingly. External custom transport implementations must be updated. --- cspell.config.yaml | 3 + pkg/backend/dist_http_server.go | 138 +++++++++++++++++ pkg/backend/dist_http_transport.go | 241 +++++++++++++++++++++++++++++ pkg/backend/dist_http_types.go | 25 +++ pkg/backend/dist_memory.go | 164 ++++++++++++++------ pkg/cache/cmap_test.go | 3 +- tests/hypercache_mgmt_dist_test.go | 3 +- tests/management_http_test.go | 2 +- 8 files changed, 525 insertions(+), 54 deletions(-) create mode 100644 pkg/backend/dist_http_server.go create mode 100644 pkg/backend/dist_http_transport.go create mode 100644 pkg/backend/dist_http_types.go diff --git a/cspell.config.yaml b/cspell.config.yaml index d9cc71b..9ba336c 100644 --- a/cspell.config.yaml +++ b/cspell.config.yaml @@ -27,6 +27,7 @@ words: - excludeonly - exhaustruct - Fanout + - fctx - forcetypeassert - Fprintf - Fprintln @@ -44,8 +45,10 @@ words: - GOPATH - gosec - honnef + - hreq - ireturn - Itemm + - lamport - LFUDA - localmodule - logrus diff --git a/pkg/backend/dist_http_server.go b/pkg/backend/dist_http_server.go new file mode 100644 index 0000000..be8596f --- /dev/null +++ b/pkg/backend/dist_http_server.go @@ -0,0 +1,138 @@ +package backend + +import ( + "context" + "net" + "strconv" + "time" + + "github.com/goccy/go-json" + + "github.com/hyp3rd/ewrap" + + fiber "github.com/gofiber/fiber/v3" + + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +type distHTTPServer struct { + app *fiber.App + ln net.Listener + addr string +} + +// minimal request/response types reused by transport +// request/response DTOs defined in dist_http_types.go + +const ( + httpReadTimeout = 5 * time.Second + httpWriteTimeout = 5 * time.Second +) + +func newDistHTTPServer(addr string) *distHTTPServer { + app := fiber.New(fiber.Config{ReadTimeout: httpReadTimeout, WriteTimeout: httpWriteTimeout}) + + return &distHTTPServer{app: app, addr: addr} +} + +func (s *distHTTPServer) start(ctx context.Context, dm *DistMemory) error { //nolint:ireturn + // routes + // set + // POST /internal/cache/set + // body: httpSetRequest + s.app.Post("/internal/cache/set", func(fctx fiber.Ctx) error { + var req httpSetRequest + + err := json.Unmarshal(fctx.Body(), &req) + if err != nil { + return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": err.Error()}) + } + + it := &cache.Item{Key: req.Key, Value: req.Value, Expiration: time.Duration(req.Expiration) * time.Millisecond, Version: req.Version, Origin: req.Origin} + if req.Replicate { + dm.applySet(ctx, it, true) + + return fctx.JSON(httpSetResponse{}) + } + + dm.applySet(ctx, it, false) + + return fctx.JSON(httpSetResponse{}) + }) + + s.app.Get("/internal/cache/get", func(fctx fiber.Ctx) error { + key := fctx.Query("key") + if key == "" { + return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "missing key"}) + } + + owners := dm.lookupOwners(key) + if len(owners) == 0 { + return fctx.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "not owner"}) + } + + if it, ok := dm.shardFor(key).items.Get(key); ok { + return fctx.JSON(httpGetResponse{Found: true, Item: it}) + } + + return fctx.JSON(httpGetResponse{Found: false}) + }) + + s.app.Delete("/internal/cache/remove", func(fctx fiber.Ctx) error { + key := fctx.Query("key") + if key == "" { + return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "missing key"}) + } + + replicateRaw := fctx.Query("replicate", "false") + + replicate, parseErr := strconv.ParseBool(replicateRaw) + if parseErr != nil { + return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "invalid replicate"}) + } + + dm.applyRemove(ctx, key, replicate) + + return fctx.SendStatus(fiber.StatusOK) + }) + + // reuse /health in management server or provide inline + s.app.Get("/health", func(fctx fiber.Ctx) error { + return fctx.SendString("ok") + }) + + lc := net.ListenConfig{} + + ln, err := lc.Listen(ctx, "tcp", s.addr) + if err != nil { + return ewrap.Wrap(err, "dist http listen") + } + + s.ln = ln + + go func() { + err = s.app.Listener(ln) + if err != nil { + return + } + }() + + return nil +} + +func (s *distHTTPServer) stop(ctx context.Context) error { //nolint:ireturn + if s == nil || s.ln == nil { + return nil + } + + ch := make(chan error, 1) + + go func() { ch <- s.app.Shutdown() }() + + select { + case <-ctx.Done(): + return ewrap.Newf("http server shutdown timeout") + case err := <-ch: + return err + } +} diff --git a/pkg/backend/dist_http_transport.go b/pkg/backend/dist_http_transport.go new file mode 100644 index 0000000..d43bc27 --- /dev/null +++ b/pkg/backend/dist_http_transport.go @@ -0,0 +1,241 @@ +// Package backend provides backend implementations including a distributed HTTP transport. +package backend + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "time" + + "github.com/goccy/go-json" + + "github.com/hyp3rd/ewrap" + + "github.com/hyp3rd/hypercache/internal/sentinel" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// DistHTTPTransport implements DistTransport over HTTP JSON. +type DistHTTPTransport struct { // minimal MVP + client *http.Client + baseURLFn func(nodeID string) (string, bool) // resolves nodeID -> base URL (scheme+host) +} + +// internal status code threshold for error classification. +const statusThreshold = 300 + +// NewDistHTTPTransport creates a new HTTP transport. +func NewDistHTTPTransport(timeout time.Duration, resolver func(string) (string, bool)) *DistHTTPTransport { + if timeout <= 0 { + timeout = 2 * time.Second + } + + return &DistHTTPTransport{ + client: &http.Client{Timeout: timeout}, + baseURLFn: resolver, + } +} + +// request/response DTOs moved to dist_http_types.go + +// ForwardSet forwards a set (or replication) request to a remote node. +func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error { //nolint:ireturn + base, ok := t.baseURLFn(nodeID) + if !ok { + return sentinel.ErrBackendNotFound + } + + reqBody := httpSetRequest{Key: item.Key, Value: item.Value, Expiration: item.Expiration.Milliseconds(), Version: item.Version, Origin: item.Origin, Replicate: replicate} + + payloadBytes, err := json.Marshal(&reqBody) + if err != nil { + return ewrap.Wrap(err, "marshal set request") + } + + url := base + "/internal/cache/set" + + hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payloadBytes)) // background ctx; caller handles outer deadline + if err != nil { + return ewrap.Wrap(err, "new request") + } + + hreq.Header.Set("Content-Type", "application/json") + + resp, err := t.client.Do(hreq) + if err != nil { + return ewrap.Wrap(err, "do request") + } + + defer func() { + _ = resp.Body.Close() //nolint:errcheck // best-effort + }() + + if resp.StatusCode == http.StatusNotFound { + return sentinel.ErrBackendNotFound + } + + const statusThreshold = 300 // local redeclare for linter clarity + if resp.StatusCode >= statusThreshold { + body, rerr := io.ReadAll(resp.Body) + if rerr != nil { + return ewrap.Wrap(rerr, "read error body") + } + + return ewrap.Newf("forward set status %d body %s", resp.StatusCode, string(body)) + } + + return nil +} + +// type httpGetResponse formerly used for direct decoding; replaced by map-based decoding to satisfy linters. + +// ForwardGet fetches an item from a remote node. +func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID string, key string) (*cache.Item, bool, error) { //nolint:ireturn + base, ok := t.baseURLFn(nodeID) + if !ok { + return nil, false, sentinel.ErrBackendNotFound + } + + url := fmt.Sprintf("%s/internal/cache/get?key=%s", base, key) + + hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, false, ewrap.Wrap(err, "new request") + } + + resp, err := t.client.Do(hreq) + if err != nil { + return nil, false, ewrap.Wrap(err, "do request") + } + + defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort + + if resp.StatusCode == http.StatusNotFound { + return nil, false, sentinel.ErrBackendNotFound + } + + if resp.StatusCode != http.StatusOK { + return nil, false, ewrap.Newf("forward get status %d", resp.StatusCode) + } + + item, found, derr := decodeGetBody(resp.Body) + if derr != nil { + return nil, false, derr + } + + if !found { + return nil, false, nil + } + + return item, true, nil +} + +// decodeGetBody decodes a get response body into an item. +func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn + var raw map[string]json.RawMessage + + dec := json.NewDecoder(r) + + err := dec.Decode(&raw) + if err != nil { + return nil, false, ewrap.Wrap(err, "decode body") + } + + var found bool + if fb, ok := raw["found"]; ok { + err := json.Unmarshal(fb, &found) + if err != nil { + return nil, false, ewrap.Wrap(err, "unmarshal found") + } + } + + if !found { + return nil, false, nil + } + + if ib, ok := raw["item"]; ok && len(ib) > 0 { + // define minimal mirror struct to ensure json tags present (satisfy musttag) + var mirror struct { + Key string `json:"key"` + Value json.RawMessage `json:"value"` + Expiration int64 `json:"expiration"` + Version uint64 `json:"version"` + Origin string `json:"origin"` + } + + err := json.Unmarshal(ib, &mirror) + if err != nil { + return nil, false, ewrap.Wrap(err, "unmarshal mirror") + } + // reconstruct cache.Item (we ignore expiration formatting difference vs ms) + return &cache.Item{Key: mirror.Key, Value: mirror.Value, Expiration: time.Duration(mirror.Expiration) * time.Millisecond, Version: mirror.Version, Origin: mirror.Origin}, true, nil + } + + return &cache.Item{}, true, nil +} + +// ForwardRemove forwards a remove operation. +func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID string, key string, replicate bool) error { //nolint:ireturn + base, ok := t.baseURLFn(nodeID) + if !ok { + return sentinel.ErrBackendNotFound + } + + url := fmt.Sprintf("%s/internal/cache/remove?key=%s&replicate=%t", base, key, replicate) + + hreq, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) + if err != nil { + return ewrap.Wrap(err, "new request") + } + + resp, err := t.client.Do(hreq) + if err != nil { + return ewrap.Wrap(err, "do request") + } + + defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort + + if resp.StatusCode == http.StatusNotFound { + return sentinel.ErrBackendNotFound + } + + if resp.StatusCode >= statusThreshold { + return ewrap.Newf("forward remove status %d", resp.StatusCode) + } + + return nil +} + +// Health probes a remote node health endpoint. +func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error { //nolint:ireturn + base, ok := t.baseURLFn(nodeID) + if !ok { + return sentinel.ErrBackendNotFound + } + + url := base + "/health" + + hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return ewrap.Wrap(err, "new request") + } + + resp, err := t.client.Do(hreq) + if err != nil { + return ewrap.Wrap(err, "do request") + } + + defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort + + if resp.StatusCode == http.StatusNotFound { + return sentinel.ErrBackendNotFound + } + + if resp.StatusCode >= statusThreshold { + return ewrap.Newf("health status %d", resp.StatusCode) + } + + return nil +} diff --git a/pkg/backend/dist_http_types.go b/pkg/backend/dist_http_types.go new file mode 100644 index 0000000..69968c3 --- /dev/null +++ b/pkg/backend/dist_http_types.go @@ -0,0 +1,25 @@ +package backend + +import ( + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// Shared HTTP request/response DTOs for distributed transport/server. +type httpSetRequest struct { // shared with server & transport + Key string `json:"key"` + Value any `json:"value"` + Expiration int64 `json:"expiration_ms"` + Version uint64 `json:"version"` + Origin string `json:"origin"` + Replicate bool `json:"replicate"` +} + +type httpSetResponse struct { + Error string `json:"error,omitempty"` +} + +type httpGetResponse struct { + Found bool `json:"found"` + Item *cache.Item `json:"item,omitempty"` + Error string `json:"error,omitempty"` +} diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index 1e56f9f..da71e7c 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -29,6 +29,7 @@ type DistMemory struct { membership *cluster.Membership ring *cluster.Ring transport DistTransport + httpServer *distHTTPServer // optional internal HTTP server metrics distMetrics // configuration (static for now, future: dynamic membership/gossip) replication int @@ -47,7 +48,6 @@ type DistMemory struct { readConsistency ConsistencyLevel writeConsistency ConsistencyLevel versionCounter uint64 // global monotonic for this node (lamport-like) - // versionClock uint64 // lamport-like counter for primary writes } // ConsistencyLevel defines read/write consistency semantics. @@ -173,33 +173,17 @@ func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[Dist opt(dm) } - if dm.shardCount <= 0 { - dm.shardCount = defaultDistShardCount - } - - for range dm.shardCount { // Go 1.22+ range-over-int - dm.shards = append(dm.shards, &distShard{items: cache.New()}) - } - - if dm.membership == nil { - dm.initStandaloneMembership() - } else { - if dm.localNode == nil { - dm.localNode = cluster.NewNode("", "local") - } - - dm.membership.Upsert(dm.localNode) - dm.ring = dm.membership.Ring() - } - - if dm.hbInterval > 0 && dm.transport != nil { - dm.stopCh = make(chan struct{}) - go dm.heartbeatLoop(ctx) - } + dm.ensureShardConfig() + dm.initMembershipIfNeeded() + dm.tryStartHTTP(ctx) + dm.startHeartbeatIfEnabled(ctx) return dm, nil } +// ensureShardConfig initializes shards respecting configured shardCount. +// helper methods relocated after exported methods for lint ordering. + // Capacity returns logical capacity. func (dm *DistMemory) Capacity() int { return dm.capacity } @@ -247,7 +231,7 @@ func (dm *DistMemory) Get(ctx context.Context, key string) (*cache.Item, bool) { } // Set stores item. -func (dm *DistMemory) Set(_ context.Context, item *cache.Item) error { //nolint:ireturn +func (dm *DistMemory) Set(ctx context.Context, item *cache.Item) error { //nolint:ireturn err := item.Valid() if err != nil { return err @@ -259,7 +243,7 @@ func (dm *DistMemory) Set(_ context.Context, item *cache.Item) error { //nolint: } if owners[0] != dm.localNode.ID { // attempt forward; may promote - proceedAsPrimary, ferr := dm.handleForwardPrimary(owners, item) + proceedAsPrimary, ferr := dm.handleForwardPrimary(ctx, owners, item) if ferr != nil { return ferr } @@ -272,9 +256,9 @@ func (dm *DistMemory) Set(_ context.Context, item *cache.Item) error { //nolint: // primary path: assign version item.Version = atomic.AddUint64(&dm.versionCounter, 1) item.Origin = string(dm.localNode.ID) - dm.applySet(item, false) + dm.applySet(ctx, item, false) - acks := 1 + dm.replicateTo(item, owners[1:]) + acks := 1 + dm.replicateTo(ctx, item, owners[1:]) needed := dm.requiredAcks(len(owners), dm.writeConsistency) if acks < needed { @@ -300,10 +284,10 @@ func (dm *DistMemory) List(_ context.Context, _ ...IFilter) ([]*cache.Item, erro } // Remove deletes keys. -func (dm *DistMemory) Remove(_ context.Context, keys ...string) error { //nolint:ireturn +func (dm *DistMemory) Remove(ctx context.Context, keys ...string) error { //nolint:ireturn for _, key := range keys { if dm.isOwner(key) { // primary path - dm.applyRemove(key, true) + dm.applyRemove(ctx, key, true) continue } @@ -318,7 +302,7 @@ func (dm *DistMemory) Remove(_ context.Context, keys ...string) error { //nolint } atomic.AddInt64(&dm.metrics.forwardRemove, 1) - _ = dm.transport.ForwardRemove(string(owners[0]), key, true) //nolint:errcheck // best-effort + _ = dm.transport.ForwardRemove(ctx, string(owners[0]), key, true) //nolint:errcheck // best-effort } return nil @@ -377,9 +361,9 @@ func (dm *DistMemory) DebugOwners(key string) []cluster.NodeID { // DistTransport defines forwarding operations needed by DistMemory. type DistTransport interface { - ForwardSet(nodeID string, item *cache.Item, replicate bool) error + ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error ForwardGet(ctx context.Context, nodeID string, key string) (*cache.Item, bool, error) - ForwardRemove(nodeID string, key string, replicate bool) error + ForwardRemove(ctx context.Context, nodeID string, key string, replicate bool) error Health(ctx context.Context, nodeID string) error } @@ -402,13 +386,13 @@ func (t *InProcessTransport) Register(b *DistMemory) { func (t *InProcessTransport) Unregister(id string) { delete(t.backends, id) } // ForwardSet forwards a set operation to the specified backend node. -func (t *InProcessTransport) ForwardSet(nodeID string, item *cache.Item, replicate bool) error { //nolint:ireturn +func (t *InProcessTransport) ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error { //nolint:ireturn b, ok := t.backends[nodeID] if !ok { return sentinel.ErrBackendNotFound } // direct apply bypasses ownership check (already routed) - b.applySet(item, replicate) + b.applySet(ctx, item, replicate) return nil } @@ -429,13 +413,13 @@ func (t *InProcessTransport) ForwardGet(_ context.Context, nodeID string, key st } // ForwardRemove forwards a remove operation to the specified backend node. -func (t *InProcessTransport) ForwardRemove(nodeID string, key string, replicate bool) error { //nolint:ireturn +func (t *InProcessTransport) ForwardRemove(ctx context.Context, nodeID string, key string, replicate bool) error { //nolint:ireturn b, ok := t.backends[nodeID] if !ok { return sentinel.ErrBackendNotFound } - b.applyRemove(key, replicate) + b.applyRemove(ctx, key, replicate) return nil } @@ -503,14 +487,92 @@ func (dm *DistMemory) Metrics() DistMetrics { } // Stop stops heartbeat loop if running. -func (dm *DistMemory) Stop(_ context.Context) error { //nolint:ireturn +func (dm *DistMemory) Stop(ctx context.Context) error { //nolint:ireturn if dm.stopCh != nil { close(dm.stopCh) } + if dm.httpServer != nil { + err := dm.httpServer.stop(ctx) // best-effort + if err != nil { + return err + } + } + return nil } +// ensureShardConfig initializes shards respecting configured shardCount. +func (dm *DistMemory) ensureShardConfig() { //nolint:ireturn + if dm.shardCount <= 0 { + dm.shardCount = defaultDistShardCount + } + + for range dm.shardCount { + dm.shards = append(dm.shards, &distShard{items: cache.New()}) + } +} + +// initMembershipIfNeeded sets up membership/ring and local node defaults. +func (dm *DistMemory) initMembershipIfNeeded() { //nolint:ireturn + if dm.membership == nil { + dm.initStandaloneMembership() + + return + } + + if dm.localNode == nil { + dm.localNode = cluster.NewNode("", "local") + } + + dm.membership.Upsert(dm.localNode) + + dm.ring = dm.membership.Ring() + if dm.nodeAddr == "" && dm.localNode != nil { + dm.nodeAddr = dm.localNode.Address + } +} + +// tryStartHTTP starts internal HTTP transport if not provided. +func (dm *DistMemory) tryStartHTTP(ctx context.Context) { //nolint:ireturn + if dm.transport != nil || dm.nodeAddr == "" { + return + } + + server := newDistHTTPServer(dm.nodeAddr) + + err := server.start(ctx, dm) + if err != nil { // best-effort + return + } + + dm.httpServer = server + resolver := func(nodeID string) (string, bool) { + if dm.membership != nil { + for _, n := range dm.membership.List() { + if string(n.ID) == nodeID { + return "http://" + n.Address, true + } + } + } + + if dm.localNode != nil && string(dm.localNode.ID) == nodeID { + return "http://" + dm.localNode.Address, true + } + + return "", false + } + dm.transport = NewDistHTTPTransport(2*time.Second, resolver) +} + +// startHeartbeatIfEnabled launches heartbeat loop if configured. +func (dm *DistMemory) startHeartbeatIfEnabled(ctx context.Context) { //nolint:ireturn + if dm.hbInterval > 0 && dm.transport != nil { + dm.stopCh = make(chan struct{}) + go dm.heartbeatLoop(ctx) + } +} + // lookupOwners returns ring owners slice for a key (nil if no ring). func (dm *DistMemory) lookupOwners(key string) []cluster.NodeID { //nolint:ireturn if dm.ring == nil { @@ -576,7 +638,7 @@ func (dm *DistMemory) getOne(ctx context.Context, key string, owners []cluster.N if dm.isOwner(key) { // local repair if missing if _, ok2 := dm.shardFor(key).items.Get(key); !ok2 { cloned := *it - dm.applySet(&cloned, false) + dm.applySet(ctx, &cloned, false) atomic.AddInt64(&dm.metrics.readRepair, 1) } } @@ -635,7 +697,7 @@ func (dm *DistMemory) getWithConsistency(ctx context.Context, key string, owners } // replicateTo sends writes to replicas (best-effort) returning ack count. -func (dm *DistMemory) replicateTo(item *cache.Item, replicas []cluster.NodeID) int { //nolint:ireturn +func (dm *DistMemory) replicateTo(ctx context.Context, item *cache.Item, replicas []cluster.NodeID) int { //nolint:ireturn acks := 0 for _, oid := range replicas { @@ -643,7 +705,7 @@ func (dm *DistMemory) replicateTo(item *cache.Item, replicas []cluster.NodeID) i continue } - if dm.transport != nil && dm.transport.ForwardSet(string(oid), item, false) == nil { + if dm.transport != nil && dm.transport.ForwardSet(ctx, string(oid), item, false) == nil { acks++ } } @@ -697,7 +759,7 @@ func (dm *DistMemory) repairReplicas(ctx context.Context, key string, chosen *ca for _, oid := range owners { if oid == dm.localNode.ID { - dm.repairLocalReplica(key, chosen) + dm.repairLocalReplica(ctx, key, chosen) continue } @@ -707,11 +769,11 @@ func (dm *DistMemory) repairReplicas(ctx context.Context, key string, chosen *ca } // repairLocalReplica updates the local item if stale. -func (dm *DistMemory) repairLocalReplica(key string, chosen *cache.Item) { // separated to reduce cyclomatic complexity +func (dm *DistMemory) repairLocalReplica(ctx context.Context, key string, chosen *cache.Item) { // separated to reduce cyclomatic complexity localIt, ok := dm.shardFor(key).items.Get(key) if !ok || localIt.Version < chosen.Version || (localIt.Version == chosen.Version && localIt.Origin > chosen.Origin) { cloned := *chosen - dm.applySet(&cloned, false) + dm.applySet(ctx, &cloned, false) atomic.AddInt64(&dm.metrics.readRepair, 1) } } @@ -724,20 +786,20 @@ func (dm *DistMemory) repairRemoteReplica(ctx context.Context, key string, chose it, ok, _ := dm.transport.ForwardGet(ctx, string(oid), key) //nolint:errcheck if !ok || it.Version < chosen.Version || (it.Version == chosen.Version && it.Origin > chosen.Origin) { // stale - _ = dm.transport.ForwardSet(string(oid), chosen, false) //nolint:errcheck + _ = dm.transport.ForwardSet(ctx, string(oid), chosen, false) //nolint:errcheck atomic.AddInt64(&dm.metrics.readRepair, 1) } } // handleForwardPrimary tries to forward a Set to the primary; returns (proceedAsPrimary,false) if promotion required. -func (dm *DistMemory) handleForwardPrimary(owners []cluster.NodeID, item *cache.Item) (bool, error) { //nolint:ireturn +func (dm *DistMemory) handleForwardPrimary(ctx context.Context, owners []cluster.NodeID, item *cache.Item) (bool, error) { //nolint:ireturn if dm.transport == nil { return false, sentinel.ErrNotOwner } atomic.AddInt64(&dm.metrics.forwardSet, 1) - errFwd := dm.transport.ForwardSet(string(owners[0]), item, true) + errFwd := dm.transport.ForwardSet(ctx, string(owners[0]), item, true) switch { case errFwd == nil: return false, nil // forwarded successfully @@ -842,7 +904,7 @@ func (dm *DistMemory) isOwner(key string) bool { // applySet stores item locally and optionally replicates to other owners. // replicate indicates whether replication fan-out should occur (false for replica writes). -func (dm *DistMemory) applySet(item *cache.Item, replicate bool) { +func (dm *DistMemory) applySet(ctx context.Context, item *cache.Item, replicate bool) { dm.shardFor(item.Key).items.Set(item.Key, item) if !replicate || dm.ring == nil { @@ -861,12 +923,12 @@ func (dm *DistMemory) applySet(item *cache.Item, replicate bool) { continue } - _ = dm.transport.ForwardSet(string(oid), item, false) //nolint:errcheck // best-effort replica write + _ = dm.transport.ForwardSet(ctx, string(oid), item, false) //nolint:errcheck // best-effort replica write } } // applyRemove deletes locally and optionally fan-outs removal to replicas. -func (dm *DistMemory) applyRemove(key string, replicate bool) { +func (dm *DistMemory) applyRemove(ctx context.Context, key string, replicate bool) { dm.shardFor(key).items.Remove(key) if !replicate || dm.ring == nil || dm.transport == nil { @@ -885,7 +947,7 @@ func (dm *DistMemory) applyRemove(key string, replicate bool) { continue } - _ = dm.transport.ForwardRemove(string(oid), key, false) //nolint:errcheck // best-effort + _ = dm.transport.ForwardRemove(ctx, string(oid), key, false) //nolint:errcheck // best-effort } } diff --git a/pkg/cache/cmap_test.go b/pkg/cache/cmap_test.go index 7260418..73cdaed 100644 --- a/pkg/cache/cmap_test.go +++ b/pkg/cache/cmap_test.go @@ -1,10 +1,11 @@ package cache import ( - "encoding/json" "fmt" "sync" "testing" + + "github.com/goccy/go-json" ) type testStringer struct { diff --git a/tests/hypercache_mgmt_dist_test.go b/tests/hypercache_mgmt_dist_test.go index 4ccf434..ede8e07 100644 --- a/tests/hypercache_mgmt_dist_test.go +++ b/tests/hypercache_mgmt_dist_test.go @@ -2,12 +2,13 @@ package tests import ( "context" - "encoding/json" "io" "net/http" "testing" "time" + "github.com/goccy/go-json" + "github.com/hyp3rd/hypercache" "github.com/hyp3rd/hypercache/internal/constants" "github.com/hyp3rd/hypercache/pkg/backend" diff --git a/tests/management_http_test.go b/tests/management_http_test.go index 3a9b400..371a19a 100644 --- a/tests/management_http_test.go +++ b/tests/management_http_test.go @@ -2,11 +2,11 @@ package tests import ( "context" - "encoding/json" "net/http" "testing" "time" + "github.com/goccy/go-json" "github.com/longbridgeapp/assert" "github.com/hyp3rd/hypercache"