Skip to content
This repository was archived by the owner on Nov 24, 2023. It is now read-only.
6 changes: 2 additions & 4 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,8 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position) {

// FlushPointsExcept implements CheckPoint.FlushPointsExcept
func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error {
cp.RLock()
defer cp.RUnlock()

cp.Lock()
defer cp.Unlock()
// convert slice to map
excepts := make(map[string]map[string]struct{})
for _, schemaTable := range exceptTables {
Expand Down Expand Up @@ -414,7 +413,6 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl
for _, point := range points {
point.flush()
}

cp.globalPointSaveTime = time.Now()
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions syncer/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
flush
skip // used by Syncer.recordSkipSQLsPos to record global pos, but not execute SQL
rotate
fake // used to flush worker's checkpoint
)

func (t opType) String() string {
Expand All @@ -53,6 +54,8 @@ func (t opType) String() string {
return "skip"
case rotate:
return "rotate"
case fake:
return "fake"
}

return ""
Expand Down Expand Up @@ -153,6 +156,13 @@ func newFlushJob() *job {
}
}

func newFakeJob(pos mysql.Position) *job {
return &job{
tp: fake,
pos: pos,
}
}

func newSkipJob(pos mysql.Position, currentGtidSet gtid.Set) *job {
return &job{
tp: skip,
Expand Down
141 changes: 129 additions & 12 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,26 @@ const (
LocalBinlog
)

// FlushType represents flush checkpoint type
type FlushType uint8

// flush checkpoint type
const (
// For ddl job the global checkpoint will be updated in addJobFunc, so we needn't update and flush directly
NoNeedUpdate FlushType = iota + 1
// If global checkpoint hasn't been updated for more than 30s, or receive a flush job,
// we send a NeedUpdate request to async flush checkpoint go routine.
// It will update global checkpoint to minPos of workers and then flush checkpoint
NeedUpdate
)

const (
// GetMinPos gets min pos from checkpoints
GetMinPos int = iota + 1
// GetMaxPos gets max pos from checkpoints
GetMaxPos
)

// StreamerProducer provides the ability to generate binlog streamer by StartSync()
// but go-mysql StartSync() returns (struct, err) rather than (interface, err)
// And we can't simplely use StartSync() method in SteamerProducer
Expand Down Expand Up @@ -161,6 +181,10 @@ type Syncer struct {
jobsChanLock sync.Mutex
queueBucketMapping []string

workerCheckpoints []*binlogPoint
flushCheckpointChan chan FlushType
lastAddedJobPos *binlogPoint

c *causality

tableRouter *router.Table
Expand Down Expand Up @@ -502,6 +526,9 @@ func (s *Syncer) reset() {
s.resetReplicationSyncer()
// create new job chans
s.newJobChans(s.cfg.WorkerCount + 1)
s.lastAddedJobPos = newBinlogPoint(minCheckpoint, minCheckpoint)
s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount)
s.flushCheckpointChan = make(chan FlushType, 16)
// clear tables info
s.clearAllTables()

Expand Down Expand Up @@ -736,21 +763,25 @@ func (s *Syncer) checkWait(job *job) bool {
return true
}

if s.checkpoint.CheckGlobalPoint() {
return true
}

return false
}

func (s *Syncer) addJob(job *job) error {
defer func() {
if job.tp == insert || job.tp == update || job.tp == del {
pos := job.pos
if s.cfg.IsSharding {
pos = s.sgk.AdjustGlobalPoint(pos)
}
s.lastAddedJobPos.save(pos)
}
}()
var (
queueBucket int
execDDLReq *pb.ExecDDLRequest
)
switch job.tp {
case xid:
s.saveGlobalPoint(job.pos)
return nil
case flush:
addedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc()
Expand All @@ -764,7 +795,11 @@ func (s *Syncer) addJob(job *job) error {
}
s.jobWg.Wait()
finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc()
return s.flushCheckPoints()
select {
case <-s.done:
case s.flushCheckpointChan <- NeedUpdate:
}
return nil
case ddl:
s.jobWg.Wait()
addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Inc()
Expand Down Expand Up @@ -814,8 +849,16 @@ func (s *Syncer) addJob(job *job) error {
}
}

if wait {
return s.flushCheckPoints()
if s.checkpoint.CheckGlobalPoint() {
select {
case <-s.done:
case s.flushCheckpointChan <- NeedUpdate:
}
} else if wait {
select {
case <-s.done:
case s.flushCheckpointChan <- NoNeedUpdate:
}
}

return nil
Expand Down Expand Up @@ -945,13 +988,15 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn,
}
}

func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jobChan chan *job) {
func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn,
jobChan chan *job, workerCheckpoint *binlogPoint) {
defer s.wg.Done()

idx := 0
count := s.cfg.Batch
jobs := make([]*job, 0, count)
tpCnt := make(map[opType]int64)
lastUpdateCheckpointTime := time.Now()

clearF := func() {
for i := 0; i < idx; i++ {
Expand Down Expand Up @@ -988,6 +1033,9 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo
if err != nil {
errCtx := &ExecErrorContext{err, jobs[affected].currentPos, fmt.Sprintf("%v", jobs)}
s.appendExecErrors(errCtx)
} else {
workerCheckpoint.save(jobs[len(jobs)-1].pos)
lastUpdateCheckpointTime = time.Now()
}
if s.tracer.Enable() {
syncerJobState := s.tracer.FinishedSyncerJobState(err)
Expand All @@ -1009,8 +1057,16 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo
if !ok {
return
}
idx++
if sqlJob.tp == fake {
// update pos only if no jobs are waiting to be executed
if len(jobs) == 0 {
workerCheckpoint.save(sqlJob.pos)
lastUpdateCheckpointTime = time.Now()
}
continue
}

idx++
if sqlJob.tp != flush && len(sqlJob.sql) > 0 {
jobs = append(jobs, sqlJob)
tpCnt[sqlJob.tp]++
Expand All @@ -1033,9 +1089,50 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo
continue
}
clearF()
} else if time.Now().Sub(lastUpdateCheckpointTime) > 3*time.Second {
jobChan <- newFakeJob(s.lastAddedJobPos.MySQLPos())
}
}
}
}

func (s *Syncer) updateGlobalCheckpointFromWorkers(typ int) {
updatePos := minCheckpoint
for _, workerCheckpoint := range s.workerCheckpoints {
pos := workerCheckpoint.MySQLPos()
if pos.Compare(minCheckpoint) > 0 {
shouldUpdate := updatePos.Compare(minCheckpoint) == 0 ||
(typ == GetMinPos && pos.Compare(updatePos) < 0) ||
(typ == GetMaxPos && pos.Compare(updatePos) > 0)
if shouldUpdate {
updatePos = pos
}
}
}
if updatePos.Compare(minCheckpoint) > 0 {
s.saveGlobalPoint(updatePos)
}
}

func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushType) {
for {
select {
case flushType := <-flushChan:
if flushType == NeedUpdate {
s.updateGlobalCheckpointFromWorkers(GetMinPos)
}
err := s.flushCheckPoints()
if err != nil {
if !utils.IsContextCanceledError(err) {
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_UnknownError, err)
return
}
}
case <-ctx.Done():
s.tctx.L().Info("async flush checkpoint routine exits", log.ShortError(ctx.Err()))
return
}
}
}

// redirectStreamer redirects binlog stream to given position
Expand Down Expand Up @@ -1096,11 +1193,17 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
go func(i int, n string) {
ctx2, cancel := context.WithCancel(ctx)
ctctx := s.tctx.WithContext(ctx2)
s.sync(ctctx, n, s.toDBConns[i], s.jobs[i])
s.sync(ctctx, n, s.toDBConns[i], s.jobs[i], s.workerCheckpoints[i])
cancel()
}(i, name)
}

s.wg.Add(1)
go func() {
defer s.wg.Done()
s.asyncFlushCheckpoint(ctx, s.flushCheckpointChan)
}()

s.queueBucketMapping = append(s.queueBucketMapping, adminQueueName)
s.wg.Add(1)
go func() {
Expand All @@ -1124,6 +1227,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

s.jobWg.Wait()
s.updateGlobalCheckpointFromWorkers(GetMaxPos)
if err2 := s.flushCheckPoints(); err2 != nil {
s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2))
}
Expand Down Expand Up @@ -1896,6 +2000,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
if ddlExecItem.req.Exec {
failpoint.Inject("ShardSyncedExecutionExit", func() {
s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ShardSyncedExecutionExit"))
s.jobWg.Wait()
s.updateGlobalCheckpointFromWorkers(GetMaxPos)
s.flushCheckPoints()
utils.OsExit(1)
})
Expand All @@ -1905,6 +2011,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
// exit in the first round sequence sharding DDL only
if group.meta.ActiveIdx() == 1 {
s.tctx.L().Warn("exit triggered", zap.String("failpoint", "SequenceShardSyncedExecutionExit"))
s.jobWg.Wait()
s.updateGlobalCheckpointFromWorkers(GetMaxPos)
s.flushCheckPoints()
utils.OsExit(1)
}
Expand Down Expand Up @@ -2311,8 +2419,9 @@ func (s *Syncer) Pause() {
s.tctx.L().Warn("try to pause, but already closed")
return
}

s.Lock()
s.stopSync()
s.Unlock()
}

// Resume resumes the paused process
Expand Down Expand Up @@ -2530,3 +2639,11 @@ func (s *Syncer) setTimezone() {
s.tctx.L().Info("use timezone", log.WrapStringerField("location", loc))
s.timezone = loc
}

func makeWorkerCheckpointArray(workerCount int) []*binlogPoint {
workerCheckpoints := make([]*binlogPoint, 0, workerCount)
for i := 0; i < workerCount; i++ {
workerCheckpoints = append(workerCheckpoints, newBinlogPoint(minCheckpoint, minCheckpoint))
}
return workerCheckpoints
}
3 changes: 3 additions & 0 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,10 @@ func (s *testSyncerSuite) TestCasuality(c *C) {
var wg sync.WaitGroup
s.cfg.WorkerCount = 1
syncer := NewSyncer(s.cfg)
syncer.lastAddedJobPos = newBinlogPoint(minCheckpoint, minCheckpoint)
syncer.jobs = []chan *job{make(chan *job, 1)}
syncer.workerCheckpoints = makeWorkerCheckpointArray(1)
syncer.flushCheckpointChan = make(chan FlushType, 16)
syncer.queueBucketMapping = []string{"queue_0", adminQueueName}

wg.Add(1)
Expand Down
10 changes: 10 additions & 0 deletions tests/dmctl_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,21 @@ function run() {
server_uuid=$(tail -n 1 $WORK_DIR/worker1/relay_log/server-uuid.index)
run_sql "show binary logs\G" $MYSQL_PORT1 $MYSQL_PASSWORD1
max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE"| tail -n 1 | awk -F":" '{print $NF}')
echo "max_binlog_name: $max_binlog_name"
binlog_count=$(grep Log_name "$SQL_RESULT_FILE" | wc -l)
echo "binlog_count: $binlog_count"
relay_log_count=$(($(ls $WORK_DIR/worker1/relay_log/$server_uuid | wc -l) - 1))
echo "relay_log_count: $relay_log_count"
[ "$binlog_count" -eq "$relay_log_count" ]

# use ddl to flush checkpoint
run_sql "alter table \`dmctl\`.\`t_1\` add column d varchar(10);alter table \`dmctl\`.\`t_2\` add column d varchar(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "alter table \`dmctl\`.\`t_1\` add column d varchar(10);alter table \`dmctl\`.\`t_2\` add column d varchar(10);" $MYSQL_PORT2 $MYSQL_PASSWORD2
sleep 3

purge_relay_success $max_binlog_name 127.0.0.1:$WORKER1_PORT
new_relay_log_count=$(($(ls $WORK_DIR/worker1/relay_log/$server_uuid | wc -l) - 1))
echo "new_relay_log_count: $new_relay_log_count"
[ "$new_relay_log_count" -eq 1 ]
}

Expand Down
5 changes: 4 additions & 1 deletion tests/retry_cancel/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ function run() {

# use sync_diff_inspector to check full dump loader
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# execute ddl to make sure sync checkpoint is flushed
run_sql "ALTER TABLE \`retry_cancel\`.\`t1\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "ALTER TABLE \`retry_cancel\`.\`t2\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT2 $MYSQL_PASSWORD2
sleep 3
# ---------- test for incremental replication ----------
# stop DM-worker, then enable failponits
kill_dm_worker
Expand Down