From 279284b4c36402d2cd54b7a617cc73fc522b96ba Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 7 Jan 2025 02:09:19 -0800 Subject: [PATCH 1/8] Use multiple goroutine for replay --- sc/memiavl/multitree.go | 21 ++++++++++++++++++-- sc/memiavl/tree.go | 43 ++++++++++++++++++++++++++++++++++------- 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/sc/memiavl/multitree.go b/sc/memiavl/multitree.go index 7f53865b..9097bf03 100644 --- a/sc/memiavl/multitree.go +++ b/sc/memiavl/multitree.go @@ -339,15 +339,32 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio return fmt.Errorf("target index %d is in the future, latest index: %d", endIndex, lastIndex) } + var replayCount = 0 + for _, tree := range t.trees { + tree.StartBackgroundWrite() + } err = stream.Replay(firstIndex, endIndex, func(index uint64, entry proto.ChangelogEntry) error { - if err := t.apply(entry); err != nil { - return fmt.Errorf("apply rlog entry failed, %w", err) + if err := t.ApplyUpgrades(entry.Upgrades); err != nil { + return err + } + for _, cs := range entry.Changesets { + treeName := cs.Name + t.TreeByName(treeName).ApplyChangeSetAsync(cs.Changeset) } if _, err := t.SaveVersion(false); err != nil { return fmt.Errorf("replay changeset failed to save version, %w", err) } + replayCount++ + if replayCount%1000 == 0 { + fmt.Printf("Replayed %d changelog entries\n", replayCount) + } return nil }) + + for _, tree := range t.trees { + tree.WaitToCompleteAsyncWrite() + } + if err != nil { return err } diff --git a/sc/memiavl/tree.go b/sc/memiavl/tree.go index 18647f60..fea0be8c 100644 --- a/sc/memiavl/tree.go +++ b/sc/memiavl/tree.go @@ -17,7 +17,7 @@ import ( var _ types.Tree = (*Tree)(nil) var emptyHash = sha256.New().Sum(nil) -// verify change sets by replay them to rebuild iavl tree and verify the root hashes +// Tree verify change sets by replay them to rebuild iavl tree and verify the root hashes type Tree struct { version uint32 // root node of empty tree is represented as `nil` @@ -31,6 +31,9 @@ type Tree struct { // sync.RWMutex is used to protect the tree for thread safety during snapshot reload mtx *sync.RWMutex + + pendingChanges chan iavl.ChangeSet + pendingWg sync.WaitGroup } // NewEmptyTree creates an empty tree at an arbitrary version. @@ -43,8 +46,9 @@ func NewEmptyTree(version uint64, initialVersion uint32) *Tree { version: uint32(version), initialVersion: initialVersion, // no need to copy if the tree is not backed by snapshot - zeroCopy: true, - mtx: &sync.RWMutex{}, + zeroCopy: true, + mtx: &sync.RWMutex{}, + pendingWg: sync.WaitGroup{}, } } @@ -62,10 +66,11 @@ func NewWithInitialVersion(initialVersion uint32) *Tree { // NewFromSnapshot mmap the blob files and create the root node. func NewFromSnapshot(snapshot *Snapshot, zeroCopy bool, _ int) *Tree { tree := &Tree{ - version: snapshot.Version(), - snapshot: snapshot, - zeroCopy: zeroCopy, - mtx: &sync.RWMutex{}, + version: snapshot.Version(), + snapshot: snapshot, + zeroCopy: zeroCopy, + mtx: &sync.RWMutex{}, + pendingWg: sync.WaitGroup{}, } if !snapshot.IsEmpty() { @@ -116,6 +121,27 @@ func (t *Tree) ApplyChangeSet(changeSet iavl.ChangeSet) { } } +func (t *Tree) ApplyChangeSetAsync(changeSet iavl.ChangeSet) { + t.pendingChanges <- changeSet +} + +func (t *Tree) StartBackgroundWrite() { + t.pendingWg.Add(1) + t.pendingChanges = make(chan iavl.ChangeSet, 1000) + go func() { + defer t.pendingWg.Done() + for nextChange := range t.pendingChanges { + t.ApplyChangeSet(nextChange) + } + }() +} + +func (t *Tree) WaitToCompleteAsyncWrite() { + close(t.pendingChanges) + t.pendingWg.Wait() + t.pendingChanges = nil +} + func (t *Tree) Set(key, value []byte) { t.mtx.Lock() defer t.mtx.Unlock() @@ -270,6 +296,9 @@ func (t *Tree) Close() error { err = t.snapshot.Close() t.snapshot = nil } + if t.pendingChanges != nil { + close(t.pendingChanges) + } t.root = nil return err } From 6c71a01d1649676a389687467a3d2e34c677cadd Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 7 Jan 2025 02:21:20 -0800 Subject: [PATCH 2/8] Fix save version --- sc/memiavl/multitree.go | 6 ++---- sc/memiavl/tree.go | 1 + 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sc/memiavl/multitree.go b/sc/memiavl/multitree.go index 9097bf03..8af8128c 100644 --- a/sc/memiavl/multitree.go +++ b/sc/memiavl/multitree.go @@ -351,9 +351,7 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio treeName := cs.Name t.TreeByName(treeName).ApplyChangeSetAsync(cs.Changeset) } - if _, err := t.SaveVersion(false); err != nil { - return fmt.Errorf("replay changeset failed to save version, %w", err) - } + t.lastCommitInfo.Version = utils.NextVersion(t.lastCommitInfo.Version, t.initialVersion) replayCount++ if replayCount%1000 == 0 { fmt.Printf("Replayed %d changelog entries\n", replayCount) @@ -368,7 +366,7 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio if err != nil { return err } - + t.lastCommitInfo.StoreInfos = []proto.StoreInfo{} t.UpdateCommitInfo() return nil } diff --git a/sc/memiavl/tree.go b/sc/memiavl/tree.go index fea0be8c..65b1bf7e 100644 --- a/sc/memiavl/tree.go +++ b/sc/memiavl/tree.go @@ -132,6 +132,7 @@ func (t *Tree) StartBackgroundWrite() { defer t.pendingWg.Done() for nextChange := range t.pendingChanges { t.ApplyChangeSet(nextChange) + t.SaveVersion(false) } }() } From 872391924657a6d1261cb0bead62c81fe70aed03 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 7 Jan 2025 02:31:52 -0800 Subject: [PATCH 3/8] Debug --- sc/memiavl/multitree.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sc/memiavl/multitree.go b/sc/memiavl/multitree.go index 8af8128c..13df5b60 100644 --- a/sc/memiavl/multitree.go +++ b/sc/memiavl/multitree.go @@ -349,9 +349,11 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio } for _, cs := range entry.Changesets { treeName := cs.Name + fmt.Printf("[Debug] Changeset for version %d and tree %s\n", index, treeName) t.TreeByName(treeName).ApplyChangeSetAsync(cs.Changeset) } t.lastCommitInfo.Version = utils.NextVersion(t.lastCommitInfo.Version, t.initialVersion) + t.lastCommitInfo.StoreInfos = []proto.StoreInfo{} replayCount++ if replayCount%1000 == 0 { fmt.Printf("Replayed %d changelog entries\n", replayCount) @@ -366,7 +368,6 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio if err != nil { return err } - t.lastCommitInfo.StoreInfos = []proto.StoreInfo{} t.UpdateCommitInfo() return nil } From 20ca56004fd69541bd60c19434e406f11f882555 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 7 Jan 2025 02:37:44 -0800 Subject: [PATCH 4/8] Save version correctly --- sc/memiavl/multitree.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sc/memiavl/multitree.go b/sc/memiavl/multitree.go index 13df5b60..68ef4ae8 100644 --- a/sc/memiavl/multitree.go +++ b/sc/memiavl/multitree.go @@ -347,10 +347,16 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio if err := t.ApplyUpgrades(entry.Upgrades); err != nil { return err } + updatedTrees := make(map[string]bool) for _, cs := range entry.Changesets { treeName := cs.Name - fmt.Printf("[Debug] Changeset for version %d and tree %s\n", index, treeName) t.TreeByName(treeName).ApplyChangeSetAsync(cs.Changeset) + updatedTrees[treeName] = true + } + for _, tree := range t.trees { + if _, found := updatedTrees[tree.Name]; !found { + tree.SaveVersion(false) + } } t.lastCommitInfo.Version = utils.NextVersion(t.lastCommitInfo.Version, t.initialVersion) t.lastCommitInfo.StoreInfos = []proto.StoreInfo{} From 5fd03f9d7866966e1c718b6a9889dbfca3e76f90 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 7 Jan 2025 02:49:51 -0800 Subject: [PATCH 5/8] Try fix --- sc/memiavl/multitree.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sc/memiavl/multitree.go b/sc/memiavl/multitree.go index 68ef4ae8..aceed4b8 100644 --- a/sc/memiavl/multitree.go +++ b/sc/memiavl/multitree.go @@ -355,7 +355,7 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio } for _, tree := range t.trees { if _, found := updatedTrees[tree.Name]; !found { - tree.SaveVersion(false) + tree.ApplyChangeSetAsync(iavl.ChangeSet{}) } } t.lastCommitInfo.Version = utils.NextVersion(t.lastCommitInfo.Version, t.initialVersion) From bdc0ad501ee9793fc7ce53004b46d16da8598d0f Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 7 Jan 2025 07:56:30 -0800 Subject: [PATCH 6/8] Fix lint --- sc/memiavl/tree.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sc/memiavl/tree.go b/sc/memiavl/tree.go index 65b1bf7e..22b398d6 100644 --- a/sc/memiavl/tree.go +++ b/sc/memiavl/tree.go @@ -132,7 +132,7 @@ func (t *Tree) StartBackgroundWrite() { defer t.pendingWg.Done() for nextChange := range t.pendingChanges { t.ApplyChangeSet(nextChange) - t.SaveVersion(false) + _, _, _ = t.SaveVersion(false) } }() } From 4fee0b1bf36301a299386914b4a5a1a38267a01e Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 7 Jan 2025 07:58:30 -0800 Subject: [PATCH 7/8] Fix lint --- sc/memiavl/tree.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sc/memiavl/tree.go b/sc/memiavl/tree.go index 22b398d6..f7e23bf6 100644 --- a/sc/memiavl/tree.go +++ b/sc/memiavl/tree.go @@ -33,7 +33,7 @@ type Tree struct { mtx *sync.RWMutex pendingChanges chan iavl.ChangeSet - pendingWg sync.WaitGroup + pendingWg *sync.WaitGroup } // NewEmptyTree creates an empty tree at an arbitrary version. @@ -48,7 +48,7 @@ func NewEmptyTree(version uint64, initialVersion uint32) *Tree { // no need to copy if the tree is not backed by snapshot zeroCopy: true, mtx: &sync.RWMutex{}, - pendingWg: sync.WaitGroup{}, + pendingWg: &sync.WaitGroup{}, } } @@ -70,7 +70,7 @@ func NewFromSnapshot(snapshot *Snapshot, zeroCopy bool, _ int) *Tree { snapshot: snapshot, zeroCopy: zeroCopy, mtx: &sync.RWMutex{}, - pendingWg: sync.WaitGroup{}, + pendingWg: &sync.WaitGroup{}, } if !snapshot.IsEmpty() { From 87040c8f4d3a8f408a4c2a06ab80f6b7281a5fbe Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 7 Jan 2025 10:53:45 -0800 Subject: [PATCH 8/8] Fix unit test --- sc/memiavl/multitree.go | 3 --- sc/memiavl/tree.go | 3 +++ ss/pebbledb/db.go | 1 - 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sc/memiavl/multitree.go b/sc/memiavl/multitree.go index aceed4b8..2789b913 100644 --- a/sc/memiavl/multitree.go +++ b/sc/memiavl/multitree.go @@ -340,9 +340,6 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio } var replayCount = 0 - for _, tree := range t.trees { - tree.StartBackgroundWrite() - } err = stream.Replay(firstIndex, endIndex, func(index uint64, entry proto.ChangelogEntry) error { if err := t.ApplyUpgrades(entry.Upgrades); err != nil { return err diff --git a/sc/memiavl/tree.go b/sc/memiavl/tree.go index f7e23bf6..f498ca61 100644 --- a/sc/memiavl/tree.go +++ b/sc/memiavl/tree.go @@ -122,6 +122,9 @@ func (t *Tree) ApplyChangeSet(changeSet iavl.ChangeSet) { } func (t *Tree) ApplyChangeSetAsync(changeSet iavl.ChangeSet) { + if t.pendingChanges == nil { + t.StartBackgroundWrite() + } t.pendingChanges <- changeSet } diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index d5eccb99..5ec947e9 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -159,7 +159,6 @@ func (db *Database) SetLatestVersion(version int64) error { var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) - fmt.Printf("SetLatestVersion: version=%d, err=%v, latestVersionKey=%s\n", version, err, latestVersionKey) return err }