Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cspell.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ words:
- excludeonly
- exhaustruct
- Fanout
- fctx
- forcetypeassert
- Fprintf
- Fprintln
Expand All @@ -44,8 +45,10 @@ words:
- GOPATH
- gosec
- honnef
- hreq
- ireturn
- Itemm
- lamport
- LFUDA
- localmodule
- logrus
Expand Down
138 changes: 138 additions & 0 deletions pkg/backend/dist_http_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package backend

import (
"context"
"net"
"strconv"
"time"

"github.com/goccy/go-json"

"github.com/hyp3rd/ewrap"

fiber "github.com/gofiber/fiber/v3"

cache "github.com/hyp3rd/hypercache/pkg/cache/v2"
)

type distHTTPServer struct {
app *fiber.App
ln net.Listener
addr string
}

// minimal request/response types reused by transport
// request/response DTOs defined in dist_http_types.go

const (
httpReadTimeout = 5 * time.Second
httpWriteTimeout = 5 * time.Second
)

func newDistHTTPServer(addr string) *distHTTPServer {
app := fiber.New(fiber.Config{ReadTimeout: httpReadTimeout, WriteTimeout: httpWriteTimeout})

return &distHTTPServer{app: app, addr: addr}
}

func (s *distHTTPServer) start(ctx context.Context, dm *DistMemory) error { //nolint:ireturn
// routes
// set
// POST /internal/cache/set
// body: httpSetRequest
s.app.Post("/internal/cache/set", func(fctx fiber.Ctx) error {
var req httpSetRequest

err := json.Unmarshal(fctx.Body(), &req)
if err != nil {
return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": err.Error()})
}

it := &cache.Item{Key: req.Key, Value: req.Value, Expiration: time.Duration(req.Expiration) * time.Millisecond, Version: req.Version, Origin: req.Origin}
if req.Replicate {
dm.applySet(ctx, it, true)

return fctx.JSON(httpSetResponse{})
}

dm.applySet(ctx, it, false)
Copy link

Copilot AI Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context ctx passed to applySet is the server startup context, not the request context. This could cause operations to be cancelled when the server shuts down rather than respecting request timeouts. Use fctx.UserContext() to get the request context instead.

Suggested change
dm.applySet(ctx, it, false)
dm.applySet(fctx.UserContext(), it, true)
return fctx.JSON(httpSetResponse{})
}
dm.applySet(fctx.UserContext(), it, false)

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context ctx passed to applySet is the server startup context, not the request context. This could cause operations to be cancelled when the server shuts down rather than respecting request timeouts. Use fctx.UserContext() to get the request context instead.

Suggested change
dm.applySet(ctx, it, false)
dm.applySet(fctx.UserContext(), it, true)
return fctx.JSON(httpSetResponse{})
}
dm.applySet(fctx.UserContext(), it, false)

Copilot uses AI. Check for mistakes.

return fctx.JSON(httpSetResponse{})
})

s.app.Get("/internal/cache/get", func(fctx fiber.Ctx) error {
key := fctx.Query("key")
if key == "" {
return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "missing key"})
}

owners := dm.lookupOwners(key)
if len(owners) == 0 {
return fctx.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "not owner"})
}

if it, ok := dm.shardFor(key).items.Get(key); ok {
return fctx.JSON(httpGetResponse{Found: true, Item: it})
}

return fctx.JSON(httpGetResponse{Found: false})
})

s.app.Delete("/internal/cache/remove", func(fctx fiber.Ctx) error {
key := fctx.Query("key")
if key == "" {
return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "missing key"})
}

replicateRaw := fctx.Query("replicate", "false")

replicate, parseErr := strconv.ParseBool(replicateRaw)
if parseErr != nil {
return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "invalid replicate"})
}

dm.applyRemove(ctx, key, replicate)
Copy link

Copilot AI Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context ctx passed to applyRemove is the server startup context, not the request context. This could cause operations to be cancelled when the server shuts down rather than respecting request timeouts. Use fctx.UserContext() to get the request context instead.

Suggested change
dm.applyRemove(ctx, key, replicate)
dm.applyRemove(fctx.UserContext(), key, replicate)

Copilot uses AI. Check for mistakes.

return fctx.SendStatus(fiber.StatusOK)
})

// reuse /health in management server or provide inline
s.app.Get("/health", func(fctx fiber.Ctx) error {
return fctx.SendString("ok")
})

lc := net.ListenConfig{}

ln, err := lc.Listen(ctx, "tcp", s.addr)
if err != nil {
return ewrap.Wrap(err, "dist http listen")
}

s.ln = ln

go func() {
err = s.app.Listener(ln)
if err != nil {
return
}
}()

return nil
}

func (s *distHTTPServer) stop(ctx context.Context) error { //nolint:ireturn
if s == nil || s.ln == nil {
return nil
}

ch := make(chan error, 1)

go func() { ch <- s.app.Shutdown() }()

select {
case <-ctx.Done():
return ewrap.Newf("http server shutdown timeout")
case err := <-ch:
return err
}
}
241 changes: 241 additions & 0 deletions pkg/backend/dist_http_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Package backend provides backend implementations including a distributed HTTP transport.
package backend

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"

"github.com/goccy/go-json"

"github.com/hyp3rd/ewrap"

"github.com/hyp3rd/hypercache/internal/sentinel"
cache "github.com/hyp3rd/hypercache/pkg/cache/v2"
)

// DistHTTPTransport implements DistTransport over HTTP JSON.
type DistHTTPTransport struct { // minimal MVP
client *http.Client
baseURLFn func(nodeID string) (string, bool) // resolves nodeID -> base URL (scheme+host)
}

// internal status code threshold for error classification.
const statusThreshold = 300

// NewDistHTTPTransport creates a new HTTP transport.
func NewDistHTTPTransport(timeout time.Duration, resolver func(string) (string, bool)) *DistHTTPTransport {
if timeout <= 0 {
timeout = 2 * time.Second
}

return &DistHTTPTransport{
client: &http.Client{Timeout: timeout},
baseURLFn: resolver,
}
}

// request/response DTOs moved to dist_http_types.go

// ForwardSet forwards a set (or replication) request to a remote node.
func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error { //nolint:ireturn
base, ok := t.baseURLFn(nodeID)
if !ok {
return sentinel.ErrBackendNotFound
}

reqBody := httpSetRequest{Key: item.Key, Value: item.Value, Expiration: item.Expiration.Milliseconds(), Version: item.Version, Origin: item.Origin, Replicate: replicate}

payloadBytes, err := json.Marshal(&reqBody)
if err != nil {
return ewrap.Wrap(err, "marshal set request")
}

url := base + "/internal/cache/set"

hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payloadBytes)) // background ctx; caller handles outer deadline
if err != nil {
return ewrap.Wrap(err, "new request")
}

hreq.Header.Set("Content-Type", "application/json")

resp, err := t.client.Do(hreq)
if err != nil {
return ewrap.Wrap(err, "do request")
}

defer func() {
_ = resp.Body.Close() //nolint:errcheck // best-effort
}()

if resp.StatusCode == http.StatusNotFound {
return sentinel.ErrBackendNotFound
}

const statusThreshold = 300 // local redeclare for linter clarity
Copy link

Copilot AI Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constant statusThreshold is redeclared locally with the same value as the package-level constant. This creates duplication and potential for inconsistency. Remove the local declaration and use the package-level constant.

Suggested change
const statusThreshold = 300 // local redeclare for linter clarity

Copilot uses AI. Check for mistakes.
if resp.StatusCode >= statusThreshold {
body, rerr := io.ReadAll(resp.Body)
if rerr != nil {
return ewrap.Wrap(rerr, "read error body")
}

return ewrap.Newf("forward set status %d body %s", resp.StatusCode, string(body))
}

return nil
}

// type httpGetResponse formerly used for direct decoding; replaced by map-based decoding to satisfy linters.

// ForwardGet fetches an item from a remote node.
func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID string, key string) (*cache.Item, bool, error) { //nolint:ireturn
base, ok := t.baseURLFn(nodeID)
if !ok {
return nil, false, sentinel.ErrBackendNotFound
}

url := fmt.Sprintf("%s/internal/cache/get?key=%s", base, key)

hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, false, ewrap.Wrap(err, "new request")
}

resp, err := t.client.Do(hreq)
if err != nil {
return nil, false, ewrap.Wrap(err, "do request")
}

defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort

if resp.StatusCode == http.StatusNotFound {
return nil, false, sentinel.ErrBackendNotFound
}

if resp.StatusCode != http.StatusOK {
return nil, false, ewrap.Newf("forward get status %d", resp.StatusCode)
}

item, found, derr := decodeGetBody(resp.Body)
if derr != nil {
return nil, false, derr
}

if !found {
return nil, false, nil
}

return item, true, nil
}

// decodeGetBody decodes a get response body into an item.
func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn
var raw map[string]json.RawMessage

dec := json.NewDecoder(r)

err := dec.Decode(&raw)
if err != nil {
return nil, false, ewrap.Wrap(err, "decode body")
}

var found bool
if fb, ok := raw["found"]; ok {
err := json.Unmarshal(fb, &found)
if err != nil {
return nil, false, ewrap.Wrap(err, "unmarshal found")
}
}

if !found {
return nil, false, nil
}

if ib, ok := raw["item"]; ok && len(ib) > 0 {
// define minimal mirror struct to ensure json tags present (satisfy musttag)
var mirror struct {
Key string `json:"key"`
Value json.RawMessage `json:"value"`
Expiration int64 `json:"expiration"`
Version uint64 `json:"version"`
Origin string `json:"origin"`
}

err := json.Unmarshal(ib, &mirror)
if err != nil {
return nil, false, ewrap.Wrap(err, "unmarshal mirror")
}
// reconstruct cache.Item (we ignore expiration formatting difference vs ms)
return &cache.Item{Key: mirror.Key, Value: mirror.Value, Expiration: time.Duration(mirror.Expiration) * time.Millisecond, Version: mirror.Version, Origin: mirror.Origin}, true, nil
}

return &cache.Item{}, true, nil
}

// ForwardRemove forwards a remove operation.
func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID string, key string, replicate bool) error { //nolint:ireturn
base, ok := t.baseURLFn(nodeID)
if !ok {
return sentinel.ErrBackendNotFound
}

url := fmt.Sprintf("%s/internal/cache/remove?key=%s&replicate=%t", base, key, replicate)

hreq, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
if err != nil {
return ewrap.Wrap(err, "new request")
}

resp, err := t.client.Do(hreq)
if err != nil {
return ewrap.Wrap(err, "do request")
}

defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort

if resp.StatusCode == http.StatusNotFound {
return sentinel.ErrBackendNotFound
}

if resp.StatusCode >= statusThreshold {
return ewrap.Newf("forward remove status %d", resp.StatusCode)
}

return nil
}

// Health probes a remote node health endpoint.
func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error { //nolint:ireturn
base, ok := t.baseURLFn(nodeID)
if !ok {
return sentinel.ErrBackendNotFound
}

url := base + "/health"

hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return ewrap.Wrap(err, "new request")
}

resp, err := t.client.Do(hreq)
if err != nil {
return ewrap.Wrap(err, "do request")
}

defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort

if resp.StatusCode == http.StatusNotFound {
return sentinel.ErrBackendNotFound
}

if resp.StatusCode >= statusThreshold {
return ewrap.Newf("health status %d", resp.StatusCode)
}

return nil
}
Loading
Loading