From 8eaa43e7f5a915e0d8fb2ffa52c147d5b74480d5 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 12 Apr 2020 21:30:46 +0800 Subject: [PATCH 01/15] add async flush checkpoint --- syncer/syncer.go | 70 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 62 insertions(+), 8 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index a4fc437812..7af3424752 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -83,6 +83,13 @@ const ( LocalBinlog ) +type FlushType uint8 + +const ( + NoNeedUpdate FlushType = iota + 1 + NeedUpdate +) + // StreamerProducer provides the ability to generate binlog streamer by StartSync() // but go-mysql StartSync() returns (struct, err) rather than (interface, err) // And we can't simplely use StartSync() method in SteamerProducer @@ -161,6 +168,9 @@ type Syncer struct { jobsChanLock sync.Mutex queueBucketMapping []string + workerCheckpoints []*binlogPoint + flushCheckpointChan chan FlushType + c *causality tableRouter *router.Table @@ -502,6 +512,7 @@ func (s *Syncer) reset() { s.resetReplicationSyncer() // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) + s.flushCheckpointChan = make(chan FlushType, 16) // clear tables info s.clearAllTables() @@ -736,10 +747,6 @@ func (s *Syncer) checkWait(job *job) bool { return true } - if s.checkpoint.CheckGlobalPoint() { - return true - } - return false } @@ -761,7 +768,8 @@ func (s *Syncer) addJob(job *job) error { } s.jobWg.Wait() finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() - return s.flushCheckPoints() + s.flushCheckpointChan <- NoNeedUpdate + return nil case ddl: s.jobWg.Wait() addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Inc() @@ -790,6 +798,9 @@ func (s *Syncer) addJob(job *job) error { s.jobWg.Wait() s.c.reset() } + if s.checkpoint.CheckGlobalPoint() { + s.flushCheckpointChan <- NeedUpdate + } switch job.tp { case ddl: @@ -808,7 +819,7 @@ func (s *Syncer) addJob(job *job) error { } if wait { - return s.flushCheckPoints() + s.flushCheckpointChan <- NoNeedUpdate } return nil @@ -938,7 +949,8 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, } } -func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jobChan chan *job) { +func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, + jobChan chan *job, workerCheckpoint *binlogPoint) { defer s.wg.Done() idx := 0 @@ -981,6 +993,8 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo if err != nil { errCtx := &ExecErrorContext{err, jobs[affected].currentPos, fmt.Sprintf("%v", jobs)} s.appendExecErrors(errCtx) + } else { + err = workerCheckpoint.save(jobs[len(jobs)-1].pos) } if s.tracer.Enable() { syncerJobState := s.tracer.FinishedSyncerJobState(err) @@ -1031,6 +1045,39 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo } } +func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushType) { + defer func() { + close(flushChan) + s.wg.Done() + }() + for { + select { + case flushType := <-flushChan: + if flushType == NeedUpdate { + minPos := s.workerCheckpoints[0].MySQLPos() + for _, workerCheckpoint := range s.workerCheckpoints { + pos := workerCheckpoint.MySQLPos() + if minPos.Compare(pos) < 0 { + minPos = pos + } + } + // minPos will be greater than global checkpoint + s.saveGlobalPoint(minPos) + } + err := s.flushCheckPoints() + if err != nil { + if !utils.IsContextCanceledError(err) { + s.runFatalChan <- unit.NewProcessError(pb.ErrorType_UnknownError, err) + return + } + } + case <-ctx.Done(): + s.tctx.L().Info("async flush checkpoint routine exits", log.ShortError(ctx.Err())) + return + } + } +} + // redirectStreamer redirects binlog stream to given position func (s *Syncer) redirectStreamer(pos mysql.Position) error { var err error @@ -1089,11 +1136,18 @@ func (s *Syncer) Run(ctx context.Context) (err error) { go func(i int, n string) { ctx2, cancel := context.WithCancel(ctx) ctctx := s.tctx.WithContext(ctx2) - s.sync(ctctx, n, s.toDBConns[i], s.jobs[i]) + s.sync(ctctx, n, s.toDBConns[i], s.jobs[i], s.workerCheckpoints[i]) cancel() }(i, name) } + s.wg.Add(1) + go func() { + ctx2, cancel := context.WithCancel(ctx) + s.asyncFlushCheckpoint(ctx2, s.flushCheckpointChan) + cancel() + }() + s.queueBucketMapping = append(s.queueBucketMapping, adminQueueName) s.wg.Add(1) go func() { From 0796a04c85b4cc41cd4fe7967e1ef14e602aacda Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 12 Apr 2020 21:36:35 +0800 Subject: [PATCH 02/15] fix --- syncer/syncer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/syncer/syncer.go b/syncer/syncer.go index 7af3424752..2dd95d1e55 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -83,8 +83,10 @@ const ( LocalBinlog ) +// FlushType represents flush checkpoint type type FlushType uint8 +// flush checkpoint type const ( NoNeedUpdate FlushType = iota + 1 NeedUpdate From fbcea7fad87559a3c31e66ebe5f4d1ffb4709ddc Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 12 Apr 2020 22:51:16 +0800 Subject: [PATCH 03/15] update syncer --- syncer/syncer.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/syncer/syncer.go b/syncer/syncer.go index 2841b85180..5b0c7f581e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -327,6 +327,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return err } rollbackHolder.Add(fr.FuncRollback{Name: "close-DBs", Fn: s.closeDBs}) + s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount) s.bwList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BWList) if err != nil { @@ -2586,3 +2587,11 @@ func (s *Syncer) setTimezone() { s.tctx.L().Info("use timezone", log.WrapStringerField("location", loc)) s.timezone = loc } + +func makeWorkerCheckpointArray(workerCount int) []*binlogPoint { + workerCheckpoints := make([]*binlogPoint, 0, workerCount) + for i := 0; i < workerCount; i++ { + workerCheckpoints = append(workerCheckpoints, newBinlogPoint(minCheckpoint, minCheckpoint)) + } + return workerCheckpoints +} From 4b68617d3f1f26afc84202bccfff0bea9d744d4f Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 13 Apr 2020 09:49:38 +0800 Subject: [PATCH 04/15] fix --- syncer/syncer.go | 22 ++++++++++++++-------- syncer/syncer_test.go | 2 ++ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 5b0c7f581e..679e0cba2a 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -327,7 +327,6 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return err } rollbackHolder.Add(fr.FuncRollback{Name: "close-DBs", Fn: s.closeDBs}) - s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount) s.bwList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BWList) if err != nil { @@ -515,6 +514,7 @@ func (s *Syncer) reset() { s.resetReplicationSyncer() // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) + s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount) s.flushCheckpointChan = make(chan FlushType, 16) // clear tables info s.clearAllTables() @@ -774,7 +774,10 @@ func (s *Syncer) addJob(job *job) error { } s.jobWg.Wait() finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() - s.flushCheckpointChan <- NoNeedUpdate + select { + case <-s.done: + case s.flushCheckpointChan <- NoNeedUpdate: + } return nil case ddl: s.jobWg.Wait() @@ -809,7 +812,10 @@ func (s *Syncer) addJob(job *job) error { s.c.reset() } if s.checkpoint.CheckGlobalPoint() { - s.flushCheckpointChan <- NeedUpdate + select { + case <-s.done: + case s.flushCheckpointChan <- NeedUpdate: + } } switch job.tp { @@ -829,7 +835,10 @@ func (s *Syncer) addJob(job *job) error { } if wait { - s.flushCheckpointChan <- NoNeedUpdate + select { + case <-s.done: + case s.flushCheckpointChan <- NoNeedUpdate: + } } return nil @@ -1056,10 +1065,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, } func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushType) { - defer func() { - close(flushChan) - s.wg.Done() - }() + defer s.wg.Done() for { select { case flushType := <-flushChan: diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 9f86ac5c1c..f8e0547aa3 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1036,6 +1036,8 @@ func (s *testSyncerSuite) TestCasuality(c *C) { s.cfg.WorkerCount = 1 syncer := NewSyncer(s.cfg) syncer.jobs = []chan *job{make(chan *job, 1)} + syncer.workerCheckpoints = makeWorkerCheckpointArray(1) + syncer.flushCheckpointChan = make(chan FlushType, 16) syncer.queueBucketMapping = []string{"queue_0", adminQueueName} wg.Add(1) From d8bf3f10a369ab0aff326c136bc75179108390a5 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 13 Apr 2020 10:07:33 +0800 Subject: [PATCH 05/15] fix lock --- syncer/checkpoint.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 031108a650..d4c8b000e2 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -357,7 +357,6 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position) { // FlushPointsExcept implements CheckPoint.FlushPointsExcept func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error { cp.RLock() - defer cp.RUnlock() // convert slice to map excepts := make(map[string]map[string]struct{}) @@ -406,16 +405,18 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl } _, err := cp.dbConn.executeSQL(tctx, sqls, args...) + cp.RUnlock() if err != nil { return err } + cp.Lock() cp.globalPoint.flush() for _, point := range points { point.flush() } - cp.globalPointSaveTime = time.Now() + cp.Unlock() return nil } From 874a22a55bffb078a6b25d4ad588224b3216b163 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 13 Apr 2020 15:21:19 +0800 Subject: [PATCH 06/15] update fix code --- syncer/syncer.go | 17 ++++++++--------- tests/retry_cancel/run.sh | 5 ++++- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 679e0cba2a..e2af57b266 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -760,7 +760,6 @@ func (s *Syncer) addJob(job *job) error { ) switch job.tp { case xid: - s.saveGlobalPoint(job.pos) return nil case flush: addedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() @@ -811,12 +810,6 @@ func (s *Syncer) addJob(job *job) error { s.jobWg.Wait() s.c.reset() } - if s.checkpoint.CheckGlobalPoint() { - select { - case <-s.done: - case s.flushCheckpointChan <- NeedUpdate: - } - } switch job.tp { case ddl: @@ -834,7 +827,12 @@ func (s *Syncer) addJob(job *job) error { } } - if wait { + if s.checkpoint.CheckGlobalPoint() { + select { + case <-s.done: + case s.flushCheckpointChan <- NeedUpdate: + } + } else if wait { select { case <-s.done: case s.flushCheckpointChan <- NoNeedUpdate: @@ -2374,8 +2372,9 @@ func (s *Syncer) Pause() { s.tctx.L().Warn("try to pause, but already closed") return } - + s.Lock() s.stopSync() + s.Unlock() } // Resume resumes the paused process diff --git a/tests/retry_cancel/run.sh b/tests/retry_cancel/run.sh index 725a288296..159f6c92eb 100755 --- a/tests/retry_cancel/run.sh +++ b/tests/retry_cancel/run.sh @@ -92,7 +92,10 @@ function run() { # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - + # execute ddl to make sure sync checkpoint is flushed + run_sql "ALTER TABLE \`retry_cancel\`.\`t1\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "ALTER TABLE \`retry_cancel\`.\`t2\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT2 $MYSQL_PASSWORD2 + sleep 3 # ---------- test for incremental replication ---------- # stop DM-worker, then enable failponits kill_dm_worker From eb7c41870bb0afcd57391627119c147983b9bb29 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 13 Apr 2020 15:26:52 +0800 Subject: [PATCH 07/15] address comment --- syncer/checkpoint.go | 7 ++----- syncer/syncer.go | 11 ++++------- tests/retry_cancel/run.sh | 2 +- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index d4c8b000e2..b5d398650f 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -356,8 +356,8 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position) { // FlushPointsExcept implements CheckPoint.FlushPointsExcept func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error { - cp.RLock() - + cp.Lock() + defer cp.Unlock() // convert slice to map excepts := make(map[string]map[string]struct{}) for _, schemaTable := range exceptTables { @@ -405,18 +405,15 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl } _, err := cp.dbConn.executeSQL(tctx, sqls, args...) - cp.RUnlock() if err != nil { return err } - cp.Lock() cp.globalPoint.flush() for _, point := range points { point.flush() } cp.globalPointSaveTime = time.Now() - cp.Unlock() return nil } diff --git a/syncer/syncer.go b/syncer/syncer.go index e2af57b266..f5b0a923c9 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -775,7 +775,7 @@ func (s *Syncer) addJob(job *job) error { finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() select { case <-s.done: - case s.flushCheckpointChan <- NoNeedUpdate: + case s.flushCheckpointChan <- NeedUpdate: } return nil case ddl: @@ -1063,7 +1063,6 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, } func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushType) { - defer s.wg.Done() for { select { case flushType := <-flushChan: @@ -1071,11 +1070,10 @@ func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushT minPos := s.workerCheckpoints[0].MySQLPos() for _, workerCheckpoint := range s.workerCheckpoints { pos := workerCheckpoint.MySQLPos() - if minPos.Compare(pos) < 0 { + if pos.Compare(minPos) < 0 { minPos = pos } } - // minPos will be greater than global checkpoint s.saveGlobalPoint(minPos) } err := s.flushCheckPoints() @@ -1157,9 +1155,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.wg.Add(1) go func() { - ctx2, cancel := context.WithCancel(ctx) - s.asyncFlushCheckpoint(ctx2, s.flushCheckpointChan) - cancel() + defer s.wg.Done() + s.asyncFlushCheckpoint(ctx, s.flushCheckpointChan) }() s.queueBucketMapping = append(s.queueBucketMapping, adminQueueName) diff --git a/tests/retry_cancel/run.sh b/tests/retry_cancel/run.sh index 159f6c92eb..05e3cf6a81 100755 --- a/tests/retry_cancel/run.sh +++ b/tests/retry_cancel/run.sh @@ -93,7 +93,7 @@ function run() { # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml # execute ddl to make sure sync checkpoint is flushed - run_sql "ALTER TABLE \`retry_cancel\`.\`t1\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "ALTER TABLE \`retry_cancel\`.\`t1\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql "ALTER TABLE \`retry_cancel\`.\`t2\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT2 $MYSQL_PASSWORD2 sleep 3 # ---------- test for incremental replication ---------- From a78919a6a896f3716763e0f051c9aae1745c6d67 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 13 Apr 2020 15:32:40 +0800 Subject: [PATCH 08/15] add comment --- syncer/syncer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/syncer/syncer.go b/syncer/syncer.go index f5b0a923c9..25e079bbc4 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -88,7 +88,11 @@ type FlushType uint8 // flush checkpoint type const ( + // For ddl job the global checkpoint will be updated in addJobFunc, so we needn't update and flush directly NoNeedUpdate FlushType = iota + 1 + // If global checkpoint hasn't been updated for more than 30s, or receive a flush job, + // we send a NeedUpdate request to async flush checkpoint go routine. + // It will update global checkpoint to minPos of workers and then flush checkpoint NeedUpdate ) From 7d1ab7cc7f32bfd00cd0fe386d5066acc2fee7be Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 13 Apr 2020 20:44:54 +0800 Subject: [PATCH 09/15] fix update --- syncer/syncer.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 25e079bbc4..be04637657 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1066,19 +1066,23 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, } } +func (s *Syncer) updateGlobalCheckpointFromWorkers() { + minPos := s.workerCheckpoints[0].MySQLPos() + for _, workerCheckpoint := range s.workerCheckpoints { + pos := workerCheckpoint.MySQLPos() + if pos.Compare(minPos) < 0 { + minPos = pos + } + } + s.saveGlobalPoint(minPos) +} + func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushType) { for { select { case flushType := <-flushChan: if flushType == NeedUpdate { - minPos := s.workerCheckpoints[0].MySQLPos() - for _, workerCheckpoint := range s.workerCheckpoints { - pos := workerCheckpoint.MySQLPos() - if pos.Compare(minPos) < 0 { - minPos = pos - } - } - s.saveGlobalPoint(minPos) + s.updateGlobalCheckpointFromWorkers() } err := s.flushCheckPoints() if err != nil { @@ -1186,6 +1190,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers() if err2 := s.flushCheckPoints(); err2 != nil { s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2)) } From 329ff7919240ea817ae7cc4dbe261b7845477002 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 14 Apr 2020 00:04:24 +0800 Subject: [PATCH 10/15] update syncer --- syncer/syncer.go | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index be04637657..f044b579bd 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -96,6 +96,13 @@ const ( NeedUpdate ) +const ( + // GetMinPos gets min pos from checkpoints + GetMinPos int = iota + 1 + // GetMaxPos gets max pos from checkpoints + GetMaxPos +) + // StreamerProducer provides the ability to generate binlog streamer by StartSync() // but go-mysql StartSync() returns (struct, err) rather than (interface, err) // And we can't simplely use StartSync() method in SteamerProducer @@ -1066,15 +1073,22 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, } } -func (s *Syncer) updateGlobalCheckpointFromWorkers() { - minPos := s.workerCheckpoints[0].MySQLPos() +func (s *Syncer) updateGlobalCheckpointFromWorkers(typ int) { + updatePos := minCheckpoint for _, workerCheckpoint := range s.workerCheckpoints { pos := workerCheckpoint.MySQLPos() - if pos.Compare(minPos) < 0 { - minPos = pos + if pos.Compare(minCheckpoint) > 0 { + shouldUpdate := updatePos.Compare(minCheckpoint) == 0 || + (typ == GetMinPos && pos.Compare(updatePos) < 0) || + (typ == GetMaxPos && pos.Compare(updatePos) > 0) + if shouldUpdate { + updatePos = pos + } } } - s.saveGlobalPoint(minPos) + if updatePos.Compare(minCheckpoint) > 0 { + s.saveGlobalPoint(updatePos) + } } func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushType) { @@ -1082,7 +1096,7 @@ func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushT select { case flushType := <-flushChan: if flushType == NeedUpdate { - s.updateGlobalCheckpointFromWorkers() + s.updateGlobalCheckpointFromWorkers(GetMinPos) } err := s.flushCheckPoints() if err != nil { @@ -1190,7 +1204,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } s.jobWg.Wait() - s.updateGlobalCheckpointFromWorkers() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) if err2 := s.flushCheckPoints(); err2 != nil { s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2)) } @@ -1963,6 +1977,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e if ddlExecItem.req.Exec { failpoint.Inject("ShardSyncedExecutionExit", func() { s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ShardSyncedExecutionExit")) + s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) s.flushCheckPoints() utils.OsExit(1) }) @@ -1972,6 +1988,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // exit in the first round sequence sharding DDL only if group.meta.ActiveIdx() == 1 { s.tctx.L().Warn("exit triggered", zap.String("failpoint", "SequenceShardSyncedExecutionExit")) + s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) s.flushCheckPoints() utils.OsExit(1) } From e558110a887c71115a1c538e6f89d1781c22eabe Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 14 Apr 2020 10:29:56 +0800 Subject: [PATCH 11/15] fix dmctl --- tests/dmctl_basic/run.sh | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/dmctl_basic/run.sh b/tests/dmctl_basic/run.sh index bd7831b1c6..8022c1e7e2 100755 --- a/tests/dmctl_basic/run.sh +++ b/tests/dmctl_basic/run.sh @@ -176,11 +176,21 @@ function run() { server_uuid=$(tail -n 1 $WORK_DIR/worker1/relay_log/server-uuid.index) run_sql "show binary logs\G" $MYSQL_PORT1 $MYSQL_PASSWORD1 max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE"| tail -n 1 | awk -F":" '{print $NF}') + echo "max_binlog_name: $max_binlog_name" binlog_count=$(grep Log_name "$SQL_RESULT_FILE" | wc -l) + echo "binlog_count: $binlog_count" relay_log_count=$(($(ls $WORK_DIR/worker1/relay_log/$server_uuid | wc -l) - 1)) + echo "relay_log_count: $relay_log_count" [ "$binlog_count" -eq "$relay_log_count" ] + + # use ddl to flush checkpoint + run_sql "alter table \`dmctl\`.\`t_1\` add column d varchar(10);alter table \`dmctl\`.\`t_2\` add column d varchar(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "alter table \`dmctl\`.\`t_1\` add column d varchar(10);alter table \`dmctl\`.\`t_2\` add column d varchar(10);" $MYSQL_PORT2 $MYSQL_PASSWORD2 + sleep 3 + purge_relay_success $max_binlog_name 127.0.0.1:$WORKER1_PORT new_relay_log_count=$(($(ls $WORK_DIR/worker1/relay_log/$server_uuid | wc -l) - 1)) + echo "new_relay_log_count: $new_relay_log_count" [ "$new_relay_log_count" -eq 1 ] } From 9a43d5580e4d1503e08485061efb29c428d91000 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 14 Apr 2020 12:29:46 +0800 Subject: [PATCH 12/15] add fake job mechanism --- syncer/job.go | 10 ++++++++++ syncer/syncer.go | 18 ++++++++++++++++++ syncer/syncer_test.go | 1 + 3 files changed, 29 insertions(+) diff --git a/syncer/job.go b/syncer/job.go index c67cec147b..878a01ed4a 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -33,6 +33,7 @@ const ( flush skip // used by Syncer.recordSkipSQLsPos to record global pos, but not execute SQL rotate + fake // used to flush worker's checkpoint ) func (t opType) String() string { @@ -53,6 +54,8 @@ func (t opType) String() string { return "skip" case rotate: return "rotate" + case fake: + return "fake" } return "" @@ -153,6 +156,13 @@ func newFlushJob() *job { } } +func newFakeJob(pos mysql.Position) *job { + return &job{ + tp: fake, + pos: pos, + } +} + func newSkipJob(pos mysql.Position, currentGtidSet gtid.Set) *job { return &job{ tp: skip, diff --git a/syncer/syncer.go b/syncer/syncer.go index f044b579bd..52238c975b 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -183,6 +183,7 @@ type Syncer struct { workerCheckpoints []*binlogPoint flushCheckpointChan chan FlushType + lastAddedJobPos *binlogPoint c *causality @@ -525,6 +526,7 @@ func (s *Syncer) reset() { s.resetReplicationSyncer() // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) + s.lastAddedJobPos = newBinlogPoint(minCheckpoint, minCheckpoint) s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount) s.flushCheckpointChan = make(chan FlushType, 16) // clear tables info @@ -765,6 +767,7 @@ func (s *Syncer) checkWait(job *job) bool { } func (s *Syncer) addJob(job *job) error { + defer s.lastAddedJobPos.save(job.pos) var ( queueBucket int execDDLReq *pb.ExecDDLRequest @@ -985,6 +988,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, count := s.cfg.Batch jobs := make([]*job, 0, count) tpCnt := make(map[opType]int64) + lastUpdateCheckpointTime := time.Now() clearF := func() { for i := 0; i < idx; i++ { @@ -1023,6 +1027,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, s.appendExecErrors(errCtx) } else { err = workerCheckpoint.save(jobs[len(jobs)-1].pos) + lastUpdateCheckpointTime = time.Now() } if s.tracer.Enable() { syncerJobState := s.tracer.FinishedSyncerJobState(err) @@ -1046,6 +1051,17 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, } idx++ + if sqlJob.tp == fake { + // update pos only if no jobs are waiting to be executed + if len(jobs) == 0 { + err = workerCheckpoint.save(sqlJob.pos) + if err != nil { + fatalF(err, pb.ErrorType_UnknownError) + } + lastUpdateCheckpointTime = time.Now() + } + continue + } if sqlJob.tp != flush && len(sqlJob.sql) > 0 { jobs = append(jobs, sqlJob) tpCnt[sqlJob.tp]++ @@ -1068,6 +1084,8 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, continue } clearF() + } else if time.Now().Sub(lastUpdateCheckpointTime) > 3*time.Second { + jobChan <- newFakeJob(s.lastAddedJobPos.MySQLPos()) } } } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index f8e0547aa3..e8a9d9271f 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1035,6 +1035,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) { var wg sync.WaitGroup s.cfg.WorkerCount = 1 syncer := NewSyncer(s.cfg) + syncer.lastAddedJobPos = newBinlogPoint(minCheckpoint, minCheckpoint) syncer.jobs = []chan *job{make(chan *job, 1)} syncer.workerCheckpoints = makeWorkerCheckpointArray(1) syncer.flushCheckpointChan = make(chan FlushType, 16) From a622e54fc7987544b02f8f6c3454a81e5942f7f3 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 14 Apr 2020 16:31:56 +0800 Subject: [PATCH 13/15] update syncer --- syncer/syncer.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 52238c975b..d723e7c246 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -767,7 +767,11 @@ func (s *Syncer) checkWait(job *job) bool { } func (s *Syncer) addJob(job *job) error { - defer s.lastAddedJobPos.save(job.pos) + defer func() { + if job.tp == insert || job.tp == update || job.tp == del { + s.lastAddedJobPos.save(job.pos) + } + }() var ( queueBucket int execDDLReq *pb.ExecDDLRequest @@ -1026,7 +1030,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, errCtx := &ExecErrorContext{err, jobs[affected].currentPos, fmt.Sprintf("%v", jobs)} s.appendExecErrors(errCtx) } else { - err = workerCheckpoint.save(jobs[len(jobs)-1].pos) + workerCheckpoint.save(jobs[len(jobs)-1].pos) lastUpdateCheckpointTime = time.Now() } if s.tracer.Enable() { @@ -1049,19 +1053,16 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, if !ok { return } - idx++ - if sqlJob.tp == fake { // update pos only if no jobs are waiting to be executed if len(jobs) == 0 { - err = workerCheckpoint.save(sqlJob.pos) - if err != nil { - fatalF(err, pb.ErrorType_UnknownError) - } + workerCheckpoint.save(sqlJob.pos) lastUpdateCheckpointTime = time.Now() } continue } + + idx++ if sqlJob.tp != flush && len(sqlJob.sql) > 0 { jobs = append(jobs, sqlJob) tpCnt[sqlJob.tp]++ From e829019b37aa071d3a782a6cdf419685d2ff7b8e Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 14 Apr 2020 17:45:47 +0800 Subject: [PATCH 14/15] add adjust for fake binlog --- syncer/syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index d723e7c246..6ef97e33ac 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -769,7 +769,7 @@ func (s *Syncer) checkWait(job *job) bool { func (s *Syncer) addJob(job *job) error { defer func() { if job.tp == insert || job.tp == update || job.tp == del { - s.lastAddedJobPos.save(job.pos) + s.lastAddedJobPos.save(s.sgk.AdjustGlobalPoint(job.pos)) } }() var ( From 3c03b46d84b71f0853ec2e03735579a33d3682f1 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 14 Apr 2020 17:58:41 +0800 Subject: [PATCH 15/15] fix again --- syncer/syncer.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 6ef97e33ac..e9b5fc1d28 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -769,7 +769,11 @@ func (s *Syncer) checkWait(job *job) bool { func (s *Syncer) addJob(job *job) error { defer func() { if job.tp == insert || job.tp == update || job.tp == del { - s.lastAddedJobPos.save(s.sgk.AdjustGlobalPoint(job.pos)) + pos := job.pos + if s.cfg.IsSharding { + pos = s.sgk.AdjustGlobalPoint(pos) + } + s.lastAddedJobPos.save(pos) } }() var (