diff --git a/sc/memiavl/multitree.go b/sc/memiavl/multitree.go index 7f53865b..2789b913 100644 --- a/sc/memiavl/multitree.go +++ b/sc/memiavl/multitree.go @@ -339,19 +339,38 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio return fmt.Errorf("target index %d is in the future, latest index: %d", endIndex, lastIndex) } + var replayCount = 0 err = stream.Replay(firstIndex, endIndex, func(index uint64, entry proto.ChangelogEntry) error { - if err := t.apply(entry); err != nil { - return fmt.Errorf("apply rlog entry failed, %w", err) + if err := t.ApplyUpgrades(entry.Upgrades); err != nil { + return err + } + updatedTrees := make(map[string]bool) + for _, cs := range entry.Changesets { + treeName := cs.Name + t.TreeByName(treeName).ApplyChangeSetAsync(cs.Changeset) + updatedTrees[treeName] = true } - if _, err := t.SaveVersion(false); err != nil { - return fmt.Errorf("replay changeset failed to save version, %w", err) + for _, tree := range t.trees { + if _, found := updatedTrees[tree.Name]; !found { + tree.ApplyChangeSetAsync(iavl.ChangeSet{}) + } + } + t.lastCommitInfo.Version = utils.NextVersion(t.lastCommitInfo.Version, t.initialVersion) + t.lastCommitInfo.StoreInfos = []proto.StoreInfo{} + replayCount++ + if replayCount%1000 == 0 { + fmt.Printf("Replayed %d changelog entries\n", replayCount) } return nil }) + + for _, tree := range t.trees { + tree.WaitToCompleteAsyncWrite() + } + if err != nil { return err } - t.UpdateCommitInfo() return nil } diff --git a/sc/memiavl/tree.go b/sc/memiavl/tree.go index 18647f60..f498ca61 100644 --- a/sc/memiavl/tree.go +++ b/sc/memiavl/tree.go @@ -17,7 +17,7 @@ import ( var _ types.Tree = (*Tree)(nil) var emptyHash = sha256.New().Sum(nil) -// verify change sets by replay them to rebuild iavl tree and verify the root hashes +// Tree verify change sets by replay them to rebuild iavl tree and verify the root hashes type Tree struct { version uint32 // root node of empty tree is represented as `nil` @@ -31,6 +31,9 @@ type Tree struct { // sync.RWMutex is used to protect the tree for thread safety during snapshot reload mtx *sync.RWMutex + + pendingChanges chan iavl.ChangeSet + pendingWg *sync.WaitGroup } // NewEmptyTree creates an empty tree at an arbitrary version. @@ -43,8 +46,9 @@ func NewEmptyTree(version uint64, initialVersion uint32) *Tree { version: uint32(version), initialVersion: initialVersion, // no need to copy if the tree is not backed by snapshot - zeroCopy: true, - mtx: &sync.RWMutex{}, + zeroCopy: true, + mtx: &sync.RWMutex{}, + pendingWg: &sync.WaitGroup{}, } } @@ -62,10 +66,11 @@ func NewWithInitialVersion(initialVersion uint32) *Tree { // NewFromSnapshot mmap the blob files and create the root node. func NewFromSnapshot(snapshot *Snapshot, zeroCopy bool, _ int) *Tree { tree := &Tree{ - version: snapshot.Version(), - snapshot: snapshot, - zeroCopy: zeroCopy, - mtx: &sync.RWMutex{}, + version: snapshot.Version(), + snapshot: snapshot, + zeroCopy: zeroCopy, + mtx: &sync.RWMutex{}, + pendingWg: &sync.WaitGroup{}, } if !snapshot.IsEmpty() { @@ -116,6 +121,31 @@ func (t *Tree) ApplyChangeSet(changeSet iavl.ChangeSet) { } } +func (t *Tree) ApplyChangeSetAsync(changeSet iavl.ChangeSet) { + if t.pendingChanges == nil { + t.StartBackgroundWrite() + } + t.pendingChanges <- changeSet +} + +func (t *Tree) StartBackgroundWrite() { + t.pendingWg.Add(1) + t.pendingChanges = make(chan iavl.ChangeSet, 1000) + go func() { + defer t.pendingWg.Done() + for nextChange := range t.pendingChanges { + t.ApplyChangeSet(nextChange) + _, _, _ = t.SaveVersion(false) + } + }() +} + +func (t *Tree) WaitToCompleteAsyncWrite() { + close(t.pendingChanges) + t.pendingWg.Wait() + t.pendingChanges = nil +} + func (t *Tree) Set(key, value []byte) { t.mtx.Lock() defer t.mtx.Unlock() @@ -270,6 +300,9 @@ func (t *Tree) Close() error { err = t.snapshot.Close() t.snapshot = nil } + if t.pendingChanges != nil { + close(t.pendingChanges) + } t.root = nil return err } diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index ee0269b0..cd51f0f4 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -160,7 +160,6 @@ func (db *Database) SetLatestVersion(version int64) error { var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) - fmt.Printf("SetLatestVersion: version=%d, err=%v, latestVersionKey=%s\n", version, err, latestVersionKey) return err }