Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 78 additions & 8 deletions crdt/crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ package crdt
import (
"encoding/json"
"log/slog"
"reflect"
"strings"
"sync"

deep "github.com/brunoga/deep/v5"
"github.com/brunoga/deep/v5/crdt/hlc"
icore "github.com/brunoga/deep/v5/internal/core"
)

// LWW represents a Last-Write-Wins register for type T.
Expand Down Expand Up @@ -192,9 +195,42 @@ func (c *CRDT[T]) ApplyDelta(delta Delta[T]) bool {
return deep.Apply(&c.value, deep.Patch[T]{Operations: filtered}) == nil
}

// textType is the reflect.Type of crdt.Text, used to identify Text fields
// during merge without a full value-tree walk.
var textType = reflect.TypeOf(Text{})

// textAncestorPath walks up from opPath toward the root, resolving each prefix
// against root, and returns the path of the nearest ancestor whose value is of
// type Text together with true. It returns ("", false) if the op does not
// belong to a Text field. Walking stops as soon as a valid non-Text ancestor is
// found, keeping traversal to O(depth) Resolve calls per op.
func textAncestorPath(root reflect.Value, opPath string) (string, bool) {
path := opPath
for {
idx := strings.LastIndexByte(path, '/')
if idx <= 0 {
break
}
path = path[:idx]
val, err := icore.DeepPath(path).Resolve(root)
if err != nil || !val.IsValid() {
// Unresolvable prefix (e.g. map key not present locally) — keep walking up.
continue
}
if val.Type() == textType {
return path, true
}
// A valid non-Text ancestor was found at this level, but it might itself
// be nested inside a Text (e.g. a TextRun struct whose parent is Text).
// Keep walking up rather than breaking.
}
return "", false
}

// Merge performs a full state-based merge with another CRDT node.
// For each changed field the node with the strictly newer effective timestamp
// (max of write clock and tombstone) wins.
// (max of write clock and tombstone) wins. Text fields are always merged
// convergently via MergeTextRuns, bypassing LWW.
func (c *CRDT[T]) Merge(other *CRDT[T]) bool {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -206,7 +242,7 @@ func (c *CRDT[T]) Merge(other *CRDT[T]) bool {
c.clock.Update(h)
}

// Text has its own convergent merge that doesn't rely on per-field clocks.
// Fast path: T itself is a Text.
if v, ok := any(c.value).(Text); ok {
otherV := any(other.value).(Text)
c.value = any(MergeTextRuns(v, otherV)).(T)
Expand All @@ -220,10 +256,22 @@ func (c *CRDT[T]) Merge(other *CRDT[T]) bool {
return false
}

// State-based LWW: apply each op only if the remote effective time is
// strictly newer than the local effective time for that path.
localRoot := reflect.ValueOf(&c.value).Elem()
otherRoot := reflect.ValueOf(&other.value).Elem()

// Separate Text-field ops from LWW-eligible ops. Text is convergent, so
// we collect affected Text paths and apply MergeTextRuns after the LWW
// apply — no full tree walk required.
textPaths := make(map[string]struct{})
var filtered []deep.Operation
for _, op := range patch.Operations {
if textPath, ok := textAncestorPath(localRoot, op.Path); ok {
textPaths[textPath] = struct{}{}
continue
}

// State-based LWW: apply each op only if the remote effective time is
// strictly newer than the local effective time for that path.
rClock, hasRC := other.clocks[op.Path]
rTomb, hasRT := other.tombstones[op.Path]

Expand Down Expand Up @@ -260,11 +308,33 @@ func (c *CRDT[T]) Merge(other *CRDT[T]) bool {

c.mergeMeta(other)

if len(filtered) == 0 {
return false
changed := len(filtered) > 0
if changed {
_ = deep.Apply(&c.value, deep.Patch[T]{Operations: filtered})
// Refresh localRoot: Apply may have updated c.value in place.
localRoot = reflect.ValueOf(&c.value).Elem()
}
_ = deep.Apply(&c.value, deep.Patch[T]{Operations: filtered})
return true

// Convergently merge each Text field by path. Both values are resolved
// fresh from the (already-updated) local root and the remote root.
for textPath := range textPaths {
localVal, err := icore.DeepPath(textPath).Resolve(localRoot)
if err != nil || !localVal.IsValid() {
continue
}
remoteVal, err := icore.DeepPath(textPath).Resolve(otherRoot)
if err != nil || !remoteVal.IsValid() {
continue
}
merged := MergeTextRuns(localVal.Interface().(Text), remoteVal.Interface().(Text))
if err := icore.DeepPath(textPath).Set(localRoot, reflect.ValueOf(merged)); err != nil {
slog.Default().Error("crdt: Merge text set failed", "path", textPath, "err", err)
continue
}
changed = true
}

return changed
}

func (c *CRDT[T]) mergeMeta(other *CRDT[T]) {
Expand Down
184 changes: 184 additions & 0 deletions crdt/crdt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package crdt

import (
"reflect"
"strings"
"testing"
"time"
)
Expand Down Expand Up @@ -98,6 +99,189 @@ func TestCRDT_Conflict(t *testing.T) {
}
}

// --- Fix: Text fields in structs are merged convergently, not via LWW ---

type docWithText struct {
Title string
Body Text
}

// Both nodes concurrently insert into the same Text field. After a bidirectional
// Merge both must hold the same text containing both insertions.
func TestMerge_TextFieldConvergent(t *testing.T) {
nodeA := NewCRDT(docWithText{Title: "doc"}, "node-a")
nodeB := NewCRDT(docWithText{Title: "doc"}, "node-b")

nodeA.Edit(func(d *docWithText) {
d.Body = d.Body.Insert(0, "hello", nodeA.Clock())
})
nodeB.Edit(func(d *docWithText) {
d.Body = d.Body.Insert(0, "world", nodeB.Clock())
})

nodeA.Merge(nodeB)
nodeB.Merge(nodeA)

viewA := nodeA.View()
viewB := nodeB.View()

if viewA.Body.String() != viewB.Body.String() {
t.Errorf("Text diverged after merge: A=%q B=%q", viewA.Body.String(), viewB.Body.String())
}
combined := viewA.Body.String()
if !strings.Contains(combined, "hello") || !strings.Contains(combined, "world") {
t.Errorf("Text merge dropped content: got %q", combined)
}
}

// Even when nodeA's edit has an older HLC timestamp, its text must still
// appear in the merged result — Text bypasses LWW entirely.
func TestMerge_TextNotLWWFiltered(t *testing.T) {
nodeA := NewCRDT(docWithText{}, "node-a")
nodeB := NewCRDT(docWithText{}, "node-b")

// nodeA edits first (older wall-clock time)
nodeA.Edit(func(d *docWithText) {
d.Body = d.Body.Insert(0, "older", nodeA.Clock())
})
time.Sleep(2 * time.Millisecond)
// nodeB edits later (newer wall-clock time)
nodeB.Edit(func(d *docWithText) {
d.Body = d.Body.Insert(0, "newer", nodeB.Clock())
})

nodeA.Merge(nodeB)

combined := nodeA.View().Body.String()
if !strings.Contains(combined, "older") || !strings.Contains(combined, "newer") {
t.Errorf("LWW incorrectly filtered Text: got %q, want both 'older' and 'newer'", combined)
}
}

// Fix: TextRun sub-field ops (e.g. /Body/<id>/Deleted) must still be
// detected as Text-field ops and handled via MergeTextRuns, not LWW.
func TestMerge_TextSubFieldOp(t *testing.T) {
nodeA := NewCRDT(docWithText{}, "node-a")
nodeB := NewCRDT(docWithText{}, "node-b")

// nodeA inserts a run; nodeB gets the same initial state then deletes it.
nodeA.Edit(func(d *docWithText) {
d.Body = d.Body.Insert(0, "shared", nodeA.Clock())
})
// Sync nodeB to the same starting state.
nodeB.Merge(nodeA)

// nodeB soft-deletes the run (produces a /Body/<id>/Deleted sub-field op on next diff).
nodeB.Edit(func(d *docWithText) {
d.Body = d.Body.Delete(0, len("shared"))
})
// nodeA inserts more text concurrently.
nodeA.Edit(func(d *docWithText) {
d.Body = d.Body.Insert(len("shared"), " appended", nodeA.Clock())
})

nodeA.Merge(nodeB)
nodeB.Merge(nodeA)

viewA := nodeA.View()
viewB := nodeB.View()

if viewA.Body.String() != viewB.Body.String() {
t.Errorf("Sub-field op: Text diverged: A=%q B=%q", viewA.Body.String(), viewB.Body.String())
}
}

// Fix: Text field nested inside a nested struct must be detected and merged.
func TestMerge_TextInNestedStruct(t *testing.T) {
type inner struct{ Notes Text }
type outer struct{ Meta inner }

nodeA := NewCRDT(outer{}, "node-a")
nodeB := NewCRDT(outer{}, "node-b")

nodeA.Edit(func(o *outer) {
o.Meta.Notes = o.Meta.Notes.Insert(0, "note-a", nodeA.Clock())
})
nodeB.Edit(func(o *outer) {
o.Meta.Notes = o.Meta.Notes.Insert(0, "note-b", nodeB.Clock())
})

nodeA.Merge(nodeB)
nodeB.Merge(nodeA)

viewA := nodeA.View()
viewB := nodeB.View()

if viewA.Meta.Notes.String() != viewB.Meta.Notes.String() {
t.Errorf("Nested Text diverged: A=%q B=%q",
viewA.Meta.Notes.String(), viewB.Meta.Notes.String())
}
combined := viewA.Meta.Notes.String()
if !strings.Contains(combined, "note-a") || !strings.Contains(combined, "note-b") {
t.Errorf("Nested Text merge dropped content: got %q", combined)
}
}

// Fix: Text field inside a keyed-slice element must be detected by key, not
// by positional index. The old tree-walk would misalign elements if the two
// nodes had different slice orderings.
func TestMerge_TextInKeyedSliceElement(t *testing.T) {
type section struct {
ID int `deep:"key"`
Content Text
}
type article struct{ Sections []section }

initial := article{Sections: []section{
{ID: 1},
{ID: 2},
}}

nodeA := NewCRDT(initial, "node-a")
nodeB := NewCRDT(initial, "node-b")

// Both nodes write to section 2's Content concurrently.
nodeA.Edit(func(a *article) {
a.Sections[1].Content = a.Sections[1].Content.Insert(0, "from-a", nodeA.Clock())
})
nodeB.Edit(func(a *article) {
a.Sections[1].Content = a.Sections[1].Content.Insert(0, "from-b", nodeB.Clock())
})
// NodeA also prepends a new section, shifting positional indices.
nodeA.Edit(func(a *article) {
a.Sections = append([]section{{ID: 0}}, a.Sections...)
})

nodeA.Merge(nodeB)
nodeB.Merge(nodeA)

findSection := func(secs []section, id int) *section {
for i := range secs {
if secs[i].ID == id {
return &secs[i]
}
}
return nil
}

viewA := nodeA.View()
viewB := nodeB.View()

s2A := findSection(viewA.Sections, 2)
s2B := findSection(viewB.Sections, 2)
if s2A == nil || s2B == nil {
t.Fatal("section 2 missing after merge")
}
if s2A.Content.String() != s2B.Content.String() {
t.Errorf("Section 2 Text diverged: A=%q B=%q",
s2A.Content.String(), s2B.Content.String())
}
combined := s2A.Content.String()
if !strings.Contains(combined, "from-a") || !strings.Contains(combined, "from-b") {
t.Errorf("Section 2 Text missing content: got %q", combined)
}
}

func TestCRDT_JSON(t *testing.T) {
node := NewCRDT(TestUser{ID: 1, Name: "Initial"}, "node1")
node.Edit(func(u *TestUser) {
Expand Down
Loading
Loading