diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 031108a650..b5d398650f 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -356,9 +356,8 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position) { // FlushPointsExcept implements CheckPoint.FlushPointsExcept func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error { - cp.RLock() - defer cp.RUnlock() - + cp.Lock() + defer cp.Unlock() // convert slice to map excepts := make(map[string]map[string]struct{}) for _, schemaTable := range exceptTables { @@ -414,7 +413,6 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl for _, point := range points { point.flush() } - cp.globalPointSaveTime = time.Now() return nil } diff --git a/syncer/job.go b/syncer/job.go index c67cec147b..878a01ed4a 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -33,6 +33,7 @@ const ( flush skip // used by Syncer.recordSkipSQLsPos to record global pos, but not execute SQL rotate + fake // used to flush worker's checkpoint ) func (t opType) String() string { @@ -53,6 +54,8 @@ func (t opType) String() string { return "skip" case rotate: return "rotate" + case fake: + return "fake" } return "" @@ -153,6 +156,13 @@ func newFlushJob() *job { } } +func newFakeJob(pos mysql.Position) *job { + return &job{ + tp: fake, + pos: pos, + } +} + func newSkipJob(pos mysql.Position, currentGtidSet gtid.Set) *job { return &job{ tp: skip, diff --git a/syncer/syncer.go b/syncer/syncer.go index fc0d4ab64f..e9b5fc1d28 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -83,6 +83,26 @@ const ( LocalBinlog ) +// FlushType represents flush checkpoint type +type FlushType uint8 + +// flush checkpoint type +const ( + // For ddl job the global checkpoint will be updated in addJobFunc, so we needn't update and flush directly + NoNeedUpdate FlushType = iota + 1 + // If global checkpoint hasn't been updated for more than 30s, or receive a flush job, + // we send a NeedUpdate request to async flush checkpoint go routine. + // It will update global checkpoint to minPos of workers and then flush checkpoint + NeedUpdate +) + +const ( + // GetMinPos gets min pos from checkpoints + GetMinPos int = iota + 1 + // GetMaxPos gets max pos from checkpoints + GetMaxPos +) + // StreamerProducer provides the ability to generate binlog streamer by StartSync() // but go-mysql StartSync() returns (struct, err) rather than (interface, err) // And we can't simplely use StartSync() method in SteamerProducer @@ -161,6 +181,10 @@ type Syncer struct { jobsChanLock sync.Mutex queueBucketMapping []string + workerCheckpoints []*binlogPoint + flushCheckpointChan chan FlushType + lastAddedJobPos *binlogPoint + c *causality tableRouter *router.Table @@ -502,6 +526,9 @@ func (s *Syncer) reset() { s.resetReplicationSyncer() // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) + s.lastAddedJobPos = newBinlogPoint(minCheckpoint, minCheckpoint) + s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount) + s.flushCheckpointChan = make(chan FlushType, 16) // clear tables info s.clearAllTables() @@ -736,21 +763,25 @@ func (s *Syncer) checkWait(job *job) bool { return true } - if s.checkpoint.CheckGlobalPoint() { - return true - } - return false } func (s *Syncer) addJob(job *job) error { + defer func() { + if job.tp == insert || job.tp == update || job.tp == del { + pos := job.pos + if s.cfg.IsSharding { + pos = s.sgk.AdjustGlobalPoint(pos) + } + s.lastAddedJobPos.save(pos) + } + }() var ( queueBucket int execDDLReq *pb.ExecDDLRequest ) switch job.tp { case xid: - s.saveGlobalPoint(job.pos) return nil case flush: addedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() @@ -764,7 +795,11 @@ func (s *Syncer) addJob(job *job) error { } s.jobWg.Wait() finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() - return s.flushCheckPoints() + select { + case <-s.done: + case s.flushCheckpointChan <- NeedUpdate: + } + return nil case ddl: s.jobWg.Wait() addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Inc() @@ -814,8 +849,16 @@ func (s *Syncer) addJob(job *job) error { } } - if wait { - return s.flushCheckPoints() + if s.checkpoint.CheckGlobalPoint() { + select { + case <-s.done: + case s.flushCheckpointChan <- NeedUpdate: + } + } else if wait { + select { + case <-s.done: + case s.flushCheckpointChan <- NoNeedUpdate: + } } return nil @@ -945,13 +988,15 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, } } -func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jobChan chan *job) { +func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, + jobChan chan *job, workerCheckpoint *binlogPoint) { defer s.wg.Done() idx := 0 count := s.cfg.Batch jobs := make([]*job, 0, count) tpCnt := make(map[opType]int64) + lastUpdateCheckpointTime := time.Now() clearF := func() { for i := 0; i < idx; i++ { @@ -988,6 +1033,9 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo if err != nil { errCtx := &ExecErrorContext{err, jobs[affected].currentPos, fmt.Sprintf("%v", jobs)} s.appendExecErrors(errCtx) + } else { + workerCheckpoint.save(jobs[len(jobs)-1].pos) + lastUpdateCheckpointTime = time.Now() } if s.tracer.Enable() { syncerJobState := s.tracer.FinishedSyncerJobState(err) @@ -1009,8 +1057,16 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo if !ok { return } - idx++ + if sqlJob.tp == fake { + // update pos only if no jobs are waiting to be executed + if len(jobs) == 0 { + workerCheckpoint.save(sqlJob.pos) + lastUpdateCheckpointTime = time.Now() + } + continue + } + idx++ if sqlJob.tp != flush && len(sqlJob.sql) > 0 { jobs = append(jobs, sqlJob) tpCnt[sqlJob.tp]++ @@ -1033,9 +1089,50 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo continue } clearF() + } else if time.Now().Sub(lastUpdateCheckpointTime) > 3*time.Second { + jobChan <- newFakeJob(s.lastAddedJobPos.MySQLPos()) + } + } + } +} + +func (s *Syncer) updateGlobalCheckpointFromWorkers(typ int) { + updatePos := minCheckpoint + for _, workerCheckpoint := range s.workerCheckpoints { + pos := workerCheckpoint.MySQLPos() + if pos.Compare(minCheckpoint) > 0 { + shouldUpdate := updatePos.Compare(minCheckpoint) == 0 || + (typ == GetMinPos && pos.Compare(updatePos) < 0) || + (typ == GetMaxPos && pos.Compare(updatePos) > 0) + if shouldUpdate { + updatePos = pos } } } + if updatePos.Compare(minCheckpoint) > 0 { + s.saveGlobalPoint(updatePos) + } +} + +func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushType) { + for { + select { + case flushType := <-flushChan: + if flushType == NeedUpdate { + s.updateGlobalCheckpointFromWorkers(GetMinPos) + } + err := s.flushCheckPoints() + if err != nil { + if !utils.IsContextCanceledError(err) { + s.runFatalChan <- unit.NewProcessError(pb.ErrorType_UnknownError, err) + return + } + } + case <-ctx.Done(): + s.tctx.L().Info("async flush checkpoint routine exits", log.ShortError(ctx.Err())) + return + } + } } // redirectStreamer redirects binlog stream to given position @@ -1096,11 +1193,17 @@ func (s *Syncer) Run(ctx context.Context) (err error) { go func(i int, n string) { ctx2, cancel := context.WithCancel(ctx) ctctx := s.tctx.WithContext(ctx2) - s.sync(ctctx, n, s.toDBConns[i], s.jobs[i]) + s.sync(ctctx, n, s.toDBConns[i], s.jobs[i], s.workerCheckpoints[i]) cancel() }(i, name) } + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.asyncFlushCheckpoint(ctx, s.flushCheckpointChan) + }() + s.queueBucketMapping = append(s.queueBucketMapping, adminQueueName) s.wg.Add(1) go func() { @@ -1124,6 +1227,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) if err2 := s.flushCheckPoints(); err2 != nil { s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2)) } @@ -1896,6 +2000,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e if ddlExecItem.req.Exec { failpoint.Inject("ShardSyncedExecutionExit", func() { s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ShardSyncedExecutionExit")) + s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) s.flushCheckPoints() utils.OsExit(1) }) @@ -1905,6 +2011,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // exit in the first round sequence sharding DDL only if group.meta.ActiveIdx() == 1 { s.tctx.L().Warn("exit triggered", zap.String("failpoint", "SequenceShardSyncedExecutionExit")) + s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) s.flushCheckPoints() utils.OsExit(1) } @@ -2311,8 +2419,9 @@ func (s *Syncer) Pause() { s.tctx.L().Warn("try to pause, but already closed") return } - + s.Lock() s.stopSync() + s.Unlock() } // Resume resumes the paused process @@ -2530,3 +2639,11 @@ func (s *Syncer) setTimezone() { s.tctx.L().Info("use timezone", log.WrapStringerField("location", loc)) s.timezone = loc } + +func makeWorkerCheckpointArray(workerCount int) []*binlogPoint { + workerCheckpoints := make([]*binlogPoint, 0, workerCount) + for i := 0; i < workerCount; i++ { + workerCheckpoints = append(workerCheckpoints, newBinlogPoint(minCheckpoint, minCheckpoint)) + } + return workerCheckpoints +} diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 9f86ac5c1c..e8a9d9271f 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1035,7 +1035,10 @@ func (s *testSyncerSuite) TestCasuality(c *C) { var wg sync.WaitGroup s.cfg.WorkerCount = 1 syncer := NewSyncer(s.cfg) + syncer.lastAddedJobPos = newBinlogPoint(minCheckpoint, minCheckpoint) syncer.jobs = []chan *job{make(chan *job, 1)} + syncer.workerCheckpoints = makeWorkerCheckpointArray(1) + syncer.flushCheckpointChan = make(chan FlushType, 16) syncer.queueBucketMapping = []string{"queue_0", adminQueueName} wg.Add(1) diff --git a/tests/dmctl_basic/run.sh b/tests/dmctl_basic/run.sh index bd7831b1c6..8022c1e7e2 100755 --- a/tests/dmctl_basic/run.sh +++ b/tests/dmctl_basic/run.sh @@ -176,11 +176,21 @@ function run() { server_uuid=$(tail -n 1 $WORK_DIR/worker1/relay_log/server-uuid.index) run_sql "show binary logs\G" $MYSQL_PORT1 $MYSQL_PASSWORD1 max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE"| tail -n 1 | awk -F":" '{print $NF}') + echo "max_binlog_name: $max_binlog_name" binlog_count=$(grep Log_name "$SQL_RESULT_FILE" | wc -l) + echo "binlog_count: $binlog_count" relay_log_count=$(($(ls $WORK_DIR/worker1/relay_log/$server_uuid | wc -l) - 1)) + echo "relay_log_count: $relay_log_count" [ "$binlog_count" -eq "$relay_log_count" ] + + # use ddl to flush checkpoint + run_sql "alter table \`dmctl\`.\`t_1\` add column d varchar(10);alter table \`dmctl\`.\`t_2\` add column d varchar(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "alter table \`dmctl\`.\`t_1\` add column d varchar(10);alter table \`dmctl\`.\`t_2\` add column d varchar(10);" $MYSQL_PORT2 $MYSQL_PASSWORD2 + sleep 3 + purge_relay_success $max_binlog_name 127.0.0.1:$WORKER1_PORT new_relay_log_count=$(($(ls $WORK_DIR/worker1/relay_log/$server_uuid | wc -l) - 1)) + echo "new_relay_log_count: $new_relay_log_count" [ "$new_relay_log_count" -eq 1 ] } diff --git a/tests/retry_cancel/run.sh b/tests/retry_cancel/run.sh index 725a288296..05e3cf6a81 100755 --- a/tests/retry_cancel/run.sh +++ b/tests/retry_cancel/run.sh @@ -92,7 +92,10 @@ function run() { # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - + # execute ddl to make sure sync checkpoint is flushed + run_sql "ALTER TABLE \`retry_cancel\`.\`t1\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "ALTER TABLE \`retry_cancel\`.\`t2\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT2 $MYSQL_PASSWORD2 + sleep 3 # ---------- test for incremental replication ---------- # stop DM-worker, then enable failponits kill_dm_worker