Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions util/stmtsummary/v2/stmtsummary.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,25 +367,29 @@ func (s *StmtSummary) rotateLoop() {
case <-tick.C:
now := timeNow()
s.windowLock.Lock()
w := s.window
// The current window has expired and needs to be refreshed and persisted.
if now.After(w.begin.Add(time.Duration(s.RefreshInterval()) * time.Second)) {
s.window = newStmtWindow(now, uint(s.MaxStmtCount()))
size := w.lru.Size()
if size > 0 {
// Persist window asynchronously.
s.closeWg.Add(1)
go func() {
defer s.closeWg.Done()
s.storage.persist(w, now)
}()
}
if now.After(s.window.begin.Add(time.Duration(s.RefreshInterval()) * time.Second)) {
s.rotate(now)
}
s.windowLock.Unlock()
}
}
}

func (s *StmtSummary) rotate(now time.Time) {
w := s.window
s.window = newStmtWindow(now, uint(s.MaxStmtCount()))
size := w.lru.Size()
if size > 0 {
// Persist window asynchronously.
s.closeWg.Add(1)
go func() {
defer s.closeWg.Done()
s.storage.persist(w, now)
}()
}
}

// stmtWindow represents a single statistical window, which has a begin
// time and an end time. Data within a single window is eliminated
// according to the LRU strategy. All evicted data will be aggregated
Expand Down
21 changes: 1 addition & 20 deletions util/stmtsummary/v2/stmtsummary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
package stmtsummary

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -47,7 +45,6 @@ func TestStmtSummary(t *testing.T) {
ss := NewStmtSummary4Test(3)
defer ss.Close()

ss.storage = &waitableMockStmtStorage{mockStmtStorage: ss.storage.(*mockStmtStorage)}
w := ss.window
ss.Add(GenerateStmtExecInfo4Test("digest1"))
ss.Add(GenerateStmtExecInfo4Test("digest2"))
Expand All @@ -57,14 +54,8 @@ func TestStmtSummary(t *testing.T) {
require.Equal(t, 3, w.lru.Size())
require.Equal(t, 2, w.evicted.count())

ss.storage.(*waitableMockStmtStorage).Add(1)
newEnd := w.begin.Add(time.Duration(ss.RefreshInterval()+1) * time.Second)
timeNow = func() time.Time {
return newEnd
}
ss.storage.(*waitableMockStmtStorage).Wait()
ss.rotate(timeNow())

timeNow = time.Now
ss.Add(GenerateStmtExecInfo4Test("digest6"))
ss.Add(GenerateStmtExecInfo4Test("digest7"))
w = ss.window
Expand All @@ -74,13 +65,3 @@ func TestStmtSummary(t *testing.T) {
ss.Clear()
require.Equal(t, 0, w.lru.Size())
}

type waitableMockStmtStorage struct {
sync.WaitGroup
*mockStmtStorage
}

func (s *waitableMockStmtStorage) persist(w *stmtWindow, end time.Time) {
defer s.Done()
s.mockStmtStorage.persist(w, end)
}