From ee41576ca46ee5cffb3733c98cc977fa97167a75 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 14 Apr 2020 20:14:22 +0800 Subject: [PATCH 01/16] add async flush checkpoint feature --- syncer/checkpoint.go | 4 +- syncer/job.go | 10 +++ syncer/syncer.go | 142 +++++++++++++++++++++++++++++++++++--- syncer/syncer_test.go | 3 + tests/dmctl_basic/run.sh | 10 +++ tests/retry_cancel/run.sh | 5 +- 6 files changed, 160 insertions(+), 14 deletions(-) diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index fa1dceaf3a..f0983e3221 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -405,8 +405,8 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(location binlog.Location) { // FlushPointsExcept implements CheckPoint.FlushPointsExcept func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error { - cp.RLock() - defer cp.RUnlock() + cp.Lock() + defer cp.Unlock() // convert slice to map excepts := make(map[string]map[string]struct{}) diff --git a/syncer/job.go b/syncer/job.go index aa7ba37c7f..5cc3148100 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -31,6 +31,7 @@ const ( flush skip // used by Syncer.recordSkipSQLsLocation to record global location, but not execute SQL rotate + fake // used to flush worker's checkpoint ) func (t opType) String() string { @@ -51,6 +52,8 @@ func (t opType) String() string { return "skip" case rotate: return "rotate" + case fake: + return "fake" } return "" @@ -138,6 +141,13 @@ func newFlushJob() *job { } } +func newFakeJob(location binlog.Location) *job { + return &job{ + tp: fake, + location: location.Clone(), + } +} + func newSkipJob(location binlog.Location) *job { location1 := location.Clone() diff --git a/syncer/syncer.go b/syncer/syncer.go index 9af448a31b..0c552c38c4 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -88,6 +88,26 @@ const ( LocalBinlog ) +// FlushType represents flush checkpoint type +type FlushType uint8 + +// flush checkpoint type +const ( + // For ddl job the global checkpoint will be updated in addJobFunc, so we needn't update and flush directly + NoNeedUpdate FlushType = iota + 1 + // If global checkpoint hasn't been updated for more than 30s, or receive a flush job, + // we send a NeedUpdate request to async flush checkpoint go routine. + // It will update global checkpoint to minPos of workers and then flush checkpoint + NeedUpdate +) + +const ( + // GetMinPos gets min pos from checkpoints + GetMinPos int = iota + 1 + // GetMaxPos gets max pos from checkpoints + GetMaxPos +) + // Syncer can sync your MySQL data to another MySQL database. type Syncer struct { sync.RWMutex @@ -123,6 +143,10 @@ type Syncer struct { jobsChanLock sync.Mutex queueBucketMapping []string + workerCheckpoints []*binlogPoint + flushCheckpointChan chan FlushType + lastAddedJobPos *binlogPoint + c *causality tableRouter *router.Table @@ -442,6 +466,9 @@ func (s *Syncer) reset() { } // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) + s.lastAddedJobPos = newBinlogPoint(binlog.NewLocation(s.cfg.Flavor), binlog.NewLocation(s.cfg.Flavor), nil, nil) + s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount, s.cfg.Flavor) + s.flushCheckpointChan = make(chan FlushType, 16) s.execErrorDetected.Set(false) s.resetExecErrors() @@ -682,10 +709,6 @@ func (s *Syncer) checkWait(job *job) bool { return true } - if s.checkpoint.CheckGlobalPoint() { - return true - } - return false } @@ -702,12 +725,20 @@ func (s *Syncer) saveTablePoint(db, table string, location binlog.Location) { } func (s *Syncer) addJob(job *job) error { + defer func() { + if job.tp == insert || job.tp == update || job.tp == del { + pos := job.location.Clone() + if s.cfg.IsSharding { + pos = s.sgk.AdjustGlobalLocation(pos) + } + s.lastAddedJobPos.save(pos, nil) + } + }() var ( queueBucket int ) switch job.tp { case xid: - s.saveGlobalPoint(job.location) return nil case flush: addedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() @@ -718,7 +749,11 @@ func (s *Syncer) addJob(job *job) error { } s.jobWg.Wait() finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() - return s.flushCheckPoints() + select { + case <-s.done: + case s.flushCheckpointChan <- NeedUpdate: + } + return nil case ddl: s.jobWg.Wait() addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Inc() @@ -754,8 +789,16 @@ func (s *Syncer) addJob(job *job) error { } } - if wait { - return s.flushCheckPoints() + if s.checkpoint.CheckGlobalPoint() { + select { + case <-s.done: + case s.flushCheckpointChan <- NeedUpdate: + } + } else if wait { + select { + case <-s.done: + case s.flushCheckpointChan <- NoNeedUpdate: + } } return nil @@ -911,13 +954,15 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, } } -func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jobChan chan *job) { +func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, + jobChan chan *job, workerCheckpoint *binlogPoint) { defer s.wg.Done() idx := 0 count := s.cfg.Batch jobs := make([]*job, 0, count) tpCnt := make(map[opType]int64) + lastUpdateCheckpointTime := time.Now() clearF := func() { for i := 0; i < idx; i++ { @@ -954,6 +999,9 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo if err != nil { errCtx := &ExecErrorContext{err, jobs[affected].currentLocation.Clone(), fmt.Sprintf("%v", jobs)} s.appendExecErrors(errCtx) + } else { + workerCheckpoint.save(jobs[len(jobs)-1].location.Clone(), nil) + lastUpdateCheckpointTime = time.Now() } return err } @@ -965,6 +1013,15 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo if !ok { return } + if sqlJob.tp == fake { + // update pos only if no jobs are waiting to be executed + if len(jobs) == 0 { + workerCheckpoint.save(sqlJob.location.Clone(), nil) + lastUpdateCheckpointTime = time.Now() + } + continue + } + idx++ if sqlJob.tp != flush && len(sqlJob.sql) > 0 { @@ -990,12 +1047,55 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo } clearF() } else { + if time.Now().Sub(lastUpdateCheckpointTime) > 3*time.Second { + jobChan <- newFakeJob(s.lastAddedJobPos.MySQLLocation()) + } time.Sleep(waitTime) } } } } +func (s *Syncer) updateGlobalCheckpointFromWorkers(typ int) { + minPos := binlog.NewLocation(s.cfg.Flavor) + updatePos := minPos.Clone() + for _, workerCheckpoint := range s.workerCheckpoints { + pos := workerCheckpoint.MySQLLocation() + if binlog.CompareLocation(pos, minPos) > 0 { + shouldUpdate := binlog.CompareLocation(updatePos, minPos) == 0 || + (typ == GetMinPos && binlog.CompareLocation(pos, updatePos) < 0) || + (typ == GetMaxPos && binlog.CompareLocation(pos, updatePos) > 0) + if shouldUpdate { + updatePos = pos + } + } + } + if binlog.CompareLocation(updatePos, minPos) > 0 { + s.saveGlobalPoint(updatePos) + } +} + +func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushType) { + for { + select { + case flushType := <-flushChan: + if flushType == NeedUpdate { + s.updateGlobalCheckpointFromWorkers(GetMinPos) + } + err := s.flushCheckPoints() + if err != nil { + if !utils.IsContextCanceledError(err) { + s.runFatalChan <- unit.NewProcessError(err) + return + } + } + case <-ctx.Done(): + s.tctx.L().Info("async flush checkpoint routine exits", log.ShortError(ctx.Err())) + return + } + } +} + // Run starts running for sync, we should guarantee it can rerun when paused. func (s *Syncer) Run(ctx context.Context) (err error) { tctx := s.tctx.WithContext(ctx) @@ -1047,11 +1147,17 @@ func (s *Syncer) Run(ctx context.Context) (err error) { go func(i int, n string) { ctx2, cancel := context.WithCancel(ctx) ctctx := s.tctx.WithContext(ctx2) - s.sync(ctctx, n, s.toDBConns[i], s.jobs[i]) + s.sync(ctctx, n, s.toDBConns[i], s.jobs[i], s.workerCheckpoints[i]) cancel() }(i, name) } + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.asyncFlushCheckpoint(ctx, s.flushCheckpointChan) + }() + s.queueBucketMapping = append(s.queueBucketMapping, adminQueueName) s.wg.Add(1) go func() { @@ -1075,6 +1181,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) if err2 := s.flushCheckPoints(); err2 != nil { s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2)) } @@ -1862,6 +1969,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e if shardOp.Exec { failpoint.Inject("ShardSyncedExecutionExit", func() { s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ShardSyncedExecutionExit")) + s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) s.flushCheckPoints() utils.OsExit(1) }) @@ -1871,6 +1980,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // exit in the first round sequence sharding DDL only if group.meta.ActiveIdx() == 1 { s.tctx.L().Warn("exit triggered", zap.String("failpoint", "SequenceShardSyncedExecutionExit")) + s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) s.flushCheckPoints() utils.OsExit(1) } @@ -2269,8 +2380,9 @@ func (s *Syncer) Pause() { s.tctx.L().Warn("try to pause, but already closed") return } - + s.Lock() s.stopSync() + s.Unlock() } // Resume resumes the paused process @@ -2504,3 +2616,11 @@ func (s *Syncer) ShardDDLInfo() *pessimism.Info { func (s *Syncer) ShardDDLOperation() *pessimism.Operation { return s.pessimist.PendingOperation() } + +func makeWorkerCheckpointArray(workerCount int, flavor string) []*binlogPoint { + workerCheckpoints := make([]*binlogPoint, 0, workerCount) + for i := 0; i < workerCount; i++ { + workerCheckpoints = append(workerCheckpoints, newBinlogPoint(binlog.NewLocation(flavor), binlog.NewLocation(flavor), nil, nil)) + } + return workerCheckpoints +} diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index e14739a190..86e98428da 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1040,7 +1040,10 @@ func (s *testSyncerSuite) TestCasuality(c *C) { var wg sync.WaitGroup s.cfg.WorkerCount = 1 syncer := NewSyncer(s.cfg, nil) + syncer.lastAddedJobPos = newBinlogPoint(binlog.NewLocation(s.cfg.Flavor), binlog.NewLocation(s.cfg.Flavor), nil, nil) syncer.jobs = []chan *job{make(chan *job, 1)} + syncer.workerCheckpoints = makeWorkerCheckpointArray(1, s.cfg.Flavor) + syncer.flushCheckpointChan = make(chan FlushType, 16) wg.Add(1) go func() { diff --git a/tests/dmctl_basic/run.sh b/tests/dmctl_basic/run.sh index 5c5827e32d..8025f38950 100755 --- a/tests/dmctl_basic/run.sh +++ b/tests/dmctl_basic/run.sh @@ -225,11 +225,21 @@ function run() { server_uuid=$(tail -n 1 $WORK_DIR/worker1/relay_log/server-uuid.index) run_sql "show binary logs\G" $MYSQL_PORT1 $MYSQL_PASSWORD1 max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE"| tail -n 1 | awk -F":" '{print $NF}') + echo "max_binlog_name: $max_binlog_name" binlog_count=$(grep Log_name "$SQL_RESULT_FILE" | wc -l) + echo "binlog_count: $binlog_count" relay_log_count=$(($(ls $WORK_DIR/worker1/relay_log/$server_uuid | wc -l) - 1)) + echo "relay_log_count: $relay_log_count" [ "$binlog_count" -eq "$relay_log_count" ] + + # use ddl to flush checkpoint + run_sql "alter table \`dmctl\`.\`t_1\` add column d varchar(10);alter table \`dmctl\`.\`t_2\` add column d varchar(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "alter table \`dmctl\`.\`t_1\` add column d varchar(10);alter table \`dmctl\`.\`t_2\` add column d varchar(10);" $MYSQL_PORT2 $MYSQL_PASSWORD2 + sleep 3 + purge_relay_success $max_binlog_name $SOURCE_ID1 new_relay_log_count=$(($(ls $WORK_DIR/worker1/relay_log/$server_uuid | wc -l) - 1)) + echo "new_relay_log_count: $new_relay_log_count" [ "$new_relay_log_count" -eq 1 ] } diff --git a/tests/retry_cancel/run.sh b/tests/retry_cancel/run.sh index 03c32867e8..2f472cde04 100755 --- a/tests/retry_cancel/run.sh +++ b/tests/retry_cancel/run.sh @@ -103,7 +103,10 @@ function run() { # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - + # execute ddl to make sure sync checkpoint is flushed + run_sql "ALTER TABLE \`retry_cancel\`.\`t1\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "ALTER TABLE \`retry_cancel\`.\`t2\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT2 $MYSQL_PASSWORD2 + sleep 3 # ---------- test for incremental replication ---------- # stop DM-worker, then enable failponits kill_dm_worker From 0727bd2cae4a0c4fe3ce73a67727e6e6f408c162 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 14 Apr 2020 20:41:34 +0800 Subject: [PATCH 02/16] fix dmctl_basic integration test --- tests/dmctl_basic/run.sh | 45 ++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/tests/dmctl_basic/run.sh b/tests/dmctl_basic/run.sh index 8025f38950..a3c97140fd 100755 --- a/tests/dmctl_basic/run.sh +++ b/tests/dmctl_basic/run.sh @@ -195,29 +195,28 @@ function run() { update_master_config_success $dm_master_conf cmp $dm_master_conf $cur/conf/dm-master.toml -# TODO: The ddl sharding part for DM-HA still has some problem. This should be uncommented when it's fixed. -# run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 -# set +e -# i=0 -# while [ $i -lt 10 ] -# do -# show_ddl_locks_with_locks "$TASK_NAME-\`dmctl\`.\`t_target\`" "ALTER TABLE \`dmctl\`.\`t_target\` DROP COLUMN \`d\`" -# ((i++)) -# if [ "$?" != 0 ]; then -# echo "wait 1s and check for the $i-th time" -# sleep 1 -# else -# break -# fi -# done -# set -e -# if [ $i -ge 10 ]; then -# echo "show_ddl_locks_with_locks check timeout" -# exit 1 -# fi -# run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 -# check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 10 -# show_ddl_locks_no_locks $TASK_NAME + run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + set +e + i=0 + while [ $i -lt 10 ] + do + show_ddl_locks_with_locks "$TASK_NAME-\`dmctl\`.\`t_target\`" "ALTER TABLE \`dmctl\`.\`t_target\` DROP COLUMN \`d\`" + ((i++)) + if [ "$?" != 0 ]; then + echo "wait 1s and check for the $i-th time" + sleep 1 + else + break + fi + done + set -e + if [ $i -ge 10 ]; then + echo "show_ddl_locks_with_locks check timeout" + exit 1 + fi + run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 10 + show_ddl_locks_no_locks $TASK_NAME # sleep 1s to ensure syncer unit has flushed global checkpoint and updates # updated ActiveRelayLog From 3df7e1156388f4a5191b1e8e5ce554b46e420e58 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 14 Apr 2020 21:09:28 +0800 Subject: [PATCH 03/16] fix check sgk bug --- syncer/syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 0c552c38c4..412f7b5e22 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -728,7 +728,7 @@ func (s *Syncer) addJob(job *job) error { defer func() { if job.tp == insert || job.tp == update || job.tp == del { pos := job.location.Clone() - if s.cfg.IsSharding { + if s.cfg.ShardMode == config.ShardPessimistic { pos = s.sgk.AdjustGlobalLocation(pos) } s.lastAddedJobPos.save(pos, nil) From db26f766a97e04c4e96118db9f73063872bb4241 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 16 Apr 2020 11:46:39 +0800 Subject: [PATCH 04/16] fix bug --- syncer/syncer.go | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 412f7b5e22..0a9cb6487a 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -725,20 +725,19 @@ func (s *Syncer) saveTablePoint(db, table string, location binlog.Location) { } func (s *Syncer) addJob(job *job) error { - defer func() { - if job.tp == insert || job.tp == update || job.tp == del { - pos := job.location.Clone() - if s.cfg.ShardMode == config.ShardPessimistic { - pos = s.sgk.AdjustGlobalLocation(pos) - } - s.lastAddedJobPos.save(pos, nil) - } - }() - var ( - queueBucket int - ) + var queueBucket int switch job.tp { case xid: + s.jobWg.Add(s.cfg.WorkerCount) + for i := 0; i < s.cfg.WorkerCount; i++ { + s.jobs[i] <- job + } + // save max added xid pos as lastAddedJobPos for fake job + pos := job.location.Clone() + if s.cfg.ShardMode == config.ShardPessimistic { + pos = s.sgk.AdjustGlobalLocation(pos) + } + s.lastAddedJobPos.save(pos, nil) return nil case flush: addedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() @@ -963,6 +962,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jobs := make([]*job, 0, count) tpCnt := make(map[opType]int64) lastUpdateCheckpointTime := time.Now() + lastAddedXidPos := newBinlogPoint(binlog.NewLocation(s.cfg.Flavor), binlog.NewLocation(s.cfg.Flavor), nil, nil) clearF := func() { for i := 0; i < idx; i++ { @@ -1000,7 +1000,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, errCtx := &ExecErrorContext{err, jobs[affected].currentLocation.Clone(), fmt.Sprintf("%v", jobs)} s.appendExecErrors(errCtx) } else { - workerCheckpoint.save(jobs[len(jobs)-1].location.Clone(), nil) + workerCheckpoint.save(lastAddedXidPos.MySQLLocation(), nil) lastUpdateCheckpointTime = time.Now() } return err @@ -1021,9 +1021,12 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, } continue } - idx++ + if sqlJob.tp == xid { + lastAddedXidPos.save(sqlJob.location.Clone(), nil) + continue + } if sqlJob.tp != flush && len(sqlJob.sql) > 0 { jobs = append(jobs, sqlJob) tpCnt[sqlJob.tp]++ From 90ac4f3b64290280a613e4be6caebf11fa68b248 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 16 Apr 2020 14:54:11 +0800 Subject: [PATCH 05/16] remove fake job, refine xid pos update --- syncer/job.go | 10 ---------- syncer/syncer.go | 31 +++++++------------------------ syncer/syncer_test.go | 1 - 3 files changed, 7 insertions(+), 35 deletions(-) diff --git a/syncer/job.go b/syncer/job.go index 5cc3148100..aa7ba37c7f 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -31,7 +31,6 @@ const ( flush skip // used by Syncer.recordSkipSQLsLocation to record global location, but not execute SQL rotate - fake // used to flush worker's checkpoint ) func (t opType) String() string { @@ -52,8 +51,6 @@ func (t opType) String() string { return "skip" case rotate: return "rotate" - case fake: - return "fake" } return "" @@ -141,13 +138,6 @@ func newFlushJob() *job { } } -func newFakeJob(location binlog.Location) *job { - return &job{ - tp: fake, - location: location.Clone(), - } -} - func newSkipJob(location binlog.Location) *job { location1 := location.Clone() diff --git a/syncer/syncer.go b/syncer/syncer.go index 0a9cb6487a..20e11fda69 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -145,7 +145,6 @@ type Syncer struct { workerCheckpoints []*binlogPoint flushCheckpointChan chan FlushType - lastAddedJobPos *binlogPoint c *causality @@ -466,7 +465,6 @@ func (s *Syncer) reset() { } // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) - s.lastAddedJobPos = newBinlogPoint(binlog.NewLocation(s.cfg.Flavor), binlog.NewLocation(s.cfg.Flavor), nil, nil) s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount, s.cfg.Flavor) s.flushCheckpointChan = make(chan FlushType, 16) @@ -728,16 +726,12 @@ func (s *Syncer) addJob(job *job) error { var queueBucket int switch job.tp { case xid: - s.jobWg.Add(s.cfg.WorkerCount) + if s.cfg.ShardMode == config.ShardPessimistic { + job.location = s.sgk.AdjustGlobalLocation(job.location) + } for i := 0; i < s.cfg.WorkerCount; i++ { s.jobs[i] <- job } - // save max added xid pos as lastAddedJobPos for fake job - pos := job.location.Clone() - if s.cfg.ShardMode == config.ShardPessimistic { - pos = s.sgk.AdjustGlobalLocation(pos) - } - s.lastAddedJobPos.save(pos, nil) return nil case flush: addedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() @@ -961,7 +955,6 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, count := s.cfg.Batch jobs := make([]*job, 0, count) tpCnt := make(map[opType]int64) - lastUpdateCheckpointTime := time.Now() lastAddedXidPos := newBinlogPoint(binlog.NewLocation(s.cfg.Flavor), binlog.NewLocation(s.cfg.Flavor), nil, nil) clearF := func() { @@ -987,6 +980,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, executeSQLs := func() error { if len(jobs) == 0 { + workerCheckpoint.save(lastAddedXidPos.MySQLLocation(), nil) return nil } queries := make([]string, 0, len(jobs)) @@ -1001,7 +995,6 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, s.appendExecErrors(errCtx) } else { workerCheckpoint.save(lastAddedXidPos.MySQLLocation(), nil) - lastUpdateCheckpointTime = time.Now() } return err } @@ -1013,20 +1006,12 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, if !ok { return } - if sqlJob.tp == fake { - // update pos only if no jobs are waiting to be executed - if len(jobs) == 0 { - workerCheckpoint.save(sqlJob.location.Clone(), nil) - lastUpdateCheckpointTime = time.Now() - } - continue - } - idx++ - if sqlJob.tp == xid { lastAddedXidPos.save(sqlJob.location.Clone(), nil) continue } + idx++ + if sqlJob.tp != flush && len(sqlJob.sql) > 0 { jobs = append(jobs, sqlJob) tpCnt[sqlJob.tp]++ @@ -1050,9 +1035,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, } clearF() } else { - if time.Now().Sub(lastUpdateCheckpointTime) > 3*time.Second { - jobChan <- newFakeJob(s.lastAddedJobPos.MySQLLocation()) - } + workerCheckpoint.save(lastAddedXidPos.MySQLLocation(), nil) time.Sleep(waitTime) } } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 86e98428da..20d5b6582d 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1040,7 +1040,6 @@ func (s *testSyncerSuite) TestCasuality(c *C) { var wg sync.WaitGroup s.cfg.WorkerCount = 1 syncer := NewSyncer(s.cfg, nil) - syncer.lastAddedJobPos = newBinlogPoint(binlog.NewLocation(s.cfg.Flavor), binlog.NewLocation(s.cfg.Flavor), nil, nil) syncer.jobs = []chan *job{make(chan *job, 1)} syncer.workerCheckpoints = makeWorkerCheckpointArray(1, s.cfg.Flavor) syncer.flushCheckpointChan = make(chan FlushType, 16) From d9dbdd1ea97a276845f0d732a2b65458f8a83dd1 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 20 Apr 2020 15:08:03 +0800 Subject: [PATCH 06/16] Serial flush checkpoint for ddl/flush, remove flush checkpoint chan --- syncer/syncer.go | 52 ++++++++++++------------------------------- syncer/syncer_test.go | 1 - 2 files changed, 14 insertions(+), 39 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index d1b0323cd7..368bb9c755 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -88,19 +88,6 @@ const ( LocalBinlog ) -// FlushType represents flush checkpoint type -type FlushType uint8 - -// flush checkpoint type -const ( - // For ddl job the global checkpoint will be updated in addJobFunc, so we needn't update and flush directly - NoNeedUpdate FlushType = iota + 1 - // If global checkpoint hasn't been updated for more than 30s, or receive a flush job, - // we send a NeedUpdate request to async flush checkpoint go routine. - // It will update global checkpoint to minPos of workers and then flush checkpoint - NeedUpdate -) - const ( // GetMinPos gets min pos from checkpoints GetMinPos int = iota + 1 @@ -143,8 +130,7 @@ type Syncer struct { jobsChanLock sync.Mutex queueBucketMapping []string - workerCheckpoints []*binlogPoint - flushCheckpointChan chan FlushType + workerCheckpoints []*binlogPoint c *causality @@ -466,7 +452,6 @@ func (s *Syncer) reset() { // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount, s.cfg.Flavor) - s.flushCheckpointChan = make(chan FlushType, 16) s.execErrorDetected.Set(false) s.resetExecErrors() @@ -727,7 +712,7 @@ func (s *Syncer) addJob(job *job) error { switch job.tp { case xid: if s.cfg.ShardMode == config.ShardPessimistic { - job.location = s.sgk.AdjustGlobalLocation(job.location) + job.currentLocation = s.sgk.AdjustGlobalLocation(job.currentLocation) } for i := 0; i < s.cfg.WorkerCount; i++ { s.jobs[i] <- job @@ -745,11 +730,8 @@ func (s *Syncer) addJob(job *job) error { } s.jobWg.Wait() finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() - select { - case <-s.done: - case s.flushCheckpointChan <- NeedUpdate: - } - return nil + s.updateGlobalCheckpointFromWorkers(GetMinPos) + return s.flushCheckPoints() case ddl: s.jobWg.Wait() addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Inc() @@ -790,16 +772,8 @@ func (s *Syncer) addJob(job *job) error { } } - if s.checkpoint.CheckGlobalPoint() { - select { - case <-s.done: - case s.flushCheckpointChan <- NeedUpdate: - } - } else if wait { - select { - case <-s.done: - case s.flushCheckpointChan <- NoNeedUpdate: - } + if wait { + return s.flushCheckPoints() } return nil @@ -1016,7 +990,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, return } if sqlJob.tp == xid { - lastAddedXidPos.save(sqlJob.location.Clone(), nil) + lastAddedXidPos.save(sqlJob.currentLocation.Clone(), nil) continue } idx++ @@ -1070,13 +1044,15 @@ func (s *Syncer) updateGlobalCheckpointFromWorkers(typ int) { } } -func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushType) { +func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, checkpointFlushInterval int) { + tick := time.NewTicker(time.Duration(checkpointFlushInterval) * time.Second) for { select { - case flushType := <-flushChan: - if flushType == NeedUpdate { - s.updateGlobalCheckpointFromWorkers(GetMinPos) + case <-tick.C: + if !s.checkpoint.CheckGlobalPoint() { + continue } + s.updateGlobalCheckpointFromWorkers(GetMinPos) err := s.flushCheckPoints() if err != nil { if !utils.IsContextCanceledError(err) { @@ -1150,7 +1126,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.wg.Add(1) go func() { defer s.wg.Done() - s.asyncFlushCheckpoint(ctx, s.flushCheckpointChan) + s.asyncFlushCheckpoint(ctx, s.cfg.CheckpointFlushInterval) }() s.queueBucketMapping = append(s.queueBucketMapping, adminQueueName) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index bf4e928fa6..91e20d4285 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1042,7 +1042,6 @@ func (s *testSyncerSuite) TestCasuality(c *C) { syncer := NewSyncer(s.cfg, nil) syncer.jobs = []chan *job{make(chan *job, 1)} syncer.workerCheckpoints = makeWorkerCheckpointArray(1, s.cfg.Flavor) - syncer.flushCheckpointChan = make(chan FlushType, 16) syncer.queueBucketMapping = []string{"queue_0", adminQueueName} wg.Add(1) From 9489452365217025c2a5ef5b3d78a30844bfdf9d Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 20 Apr 2020 18:08:15 +0800 Subject: [PATCH 07/16] Revert "Serial flush checkpoint for ddl/flush, remove flush checkpoint chan" This reverts commit d9dbdd1ea97a276845f0d732a2b65458f8a83dd1. --- syncer/syncer.go | 52 +++++++++++++++++++++++++++++++------------ syncer/syncer_test.go | 1 + 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 368bb9c755..d1b0323cd7 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -88,6 +88,19 @@ const ( LocalBinlog ) +// FlushType represents flush checkpoint type +type FlushType uint8 + +// flush checkpoint type +const ( + // For ddl job the global checkpoint will be updated in addJobFunc, so we needn't update and flush directly + NoNeedUpdate FlushType = iota + 1 + // If global checkpoint hasn't been updated for more than 30s, or receive a flush job, + // we send a NeedUpdate request to async flush checkpoint go routine. + // It will update global checkpoint to minPos of workers and then flush checkpoint + NeedUpdate +) + const ( // GetMinPos gets min pos from checkpoints GetMinPos int = iota + 1 @@ -130,7 +143,8 @@ type Syncer struct { jobsChanLock sync.Mutex queueBucketMapping []string - workerCheckpoints []*binlogPoint + workerCheckpoints []*binlogPoint + flushCheckpointChan chan FlushType c *causality @@ -452,6 +466,7 @@ func (s *Syncer) reset() { // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount, s.cfg.Flavor) + s.flushCheckpointChan = make(chan FlushType, 16) s.execErrorDetected.Set(false) s.resetExecErrors() @@ -712,7 +727,7 @@ func (s *Syncer) addJob(job *job) error { switch job.tp { case xid: if s.cfg.ShardMode == config.ShardPessimistic { - job.currentLocation = s.sgk.AdjustGlobalLocation(job.currentLocation) + job.location = s.sgk.AdjustGlobalLocation(job.location) } for i := 0; i < s.cfg.WorkerCount; i++ { s.jobs[i] <- job @@ -730,8 +745,11 @@ func (s *Syncer) addJob(job *job) error { } s.jobWg.Wait() finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() - s.updateGlobalCheckpointFromWorkers(GetMinPos) - return s.flushCheckPoints() + select { + case <-s.done: + case s.flushCheckpointChan <- NeedUpdate: + } + return nil case ddl: s.jobWg.Wait() addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Inc() @@ -772,8 +790,16 @@ func (s *Syncer) addJob(job *job) error { } } - if wait { - return s.flushCheckPoints() + if s.checkpoint.CheckGlobalPoint() { + select { + case <-s.done: + case s.flushCheckpointChan <- NeedUpdate: + } + } else if wait { + select { + case <-s.done: + case s.flushCheckpointChan <- NoNeedUpdate: + } } return nil @@ -990,7 +1016,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, return } if sqlJob.tp == xid { - lastAddedXidPos.save(sqlJob.currentLocation.Clone(), nil) + lastAddedXidPos.save(sqlJob.location.Clone(), nil) continue } idx++ @@ -1044,15 +1070,13 @@ func (s *Syncer) updateGlobalCheckpointFromWorkers(typ int) { } } -func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, checkpointFlushInterval int) { - tick := time.NewTicker(time.Duration(checkpointFlushInterval) * time.Second) +func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushType) { for { select { - case <-tick.C: - if !s.checkpoint.CheckGlobalPoint() { - continue + case flushType := <-flushChan: + if flushType == NeedUpdate { + s.updateGlobalCheckpointFromWorkers(GetMinPos) } - s.updateGlobalCheckpointFromWorkers(GetMinPos) err := s.flushCheckPoints() if err != nil { if !utils.IsContextCanceledError(err) { @@ -1126,7 +1150,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.wg.Add(1) go func() { defer s.wg.Done() - s.asyncFlushCheckpoint(ctx, s.cfg.CheckpointFlushInterval) + s.asyncFlushCheckpoint(ctx, s.flushCheckpointChan) }() s.queueBucketMapping = append(s.queueBucketMapping, adminQueueName) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 91e20d4285..bf4e928fa6 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1042,6 +1042,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) { syncer := NewSyncer(s.cfg, nil) syncer.jobs = []chan *job{make(chan *job, 1)} syncer.workerCheckpoints = makeWorkerCheckpointArray(1, s.cfg.Flavor) + syncer.flushCheckpointChan = make(chan FlushType, 16) syncer.queueBucketMapping = []string{"queue_0", adminQueueName} wg.Add(1) From 848a18f1a70e5d6ac1c2225b0e9193591e07f270 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 20 Apr 2020 21:00:33 +0800 Subject: [PATCH 08/16] add flusher --- syncer/syncer.go | 40 +++++++++++++-------------------------- syncer/syncer_test.go | 1 - syncer/util.go | 44 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 28 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index d1b0323cd7..4297bbda80 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -143,8 +143,8 @@ type Syncer struct { jobsChanLock sync.Mutex queueBucketMapping []string - workerCheckpoints []*binlogPoint - flushCheckpointChan chan FlushType + workerCheckpoints []*binlogPoint + flusher *flushHelper c *causality @@ -466,7 +466,6 @@ func (s *Syncer) reset() { // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount, s.cfg.Flavor) - s.flushCheckpointChan = make(chan FlushType, 16) s.execErrorDetected.Set(false) s.resetExecErrors() @@ -702,14 +701,6 @@ func (s *Syncer) addCount(isFinished bool, queueBucket string, tp opType, n int6 } } -func (s *Syncer) checkWait(job *job) bool { - if job.tp == ddl { - return true - } - - return false -} - func (s *Syncer) saveTablePoint(db, table string, location binlog.Location) { ti, err := s.schemaTracker.GetTable(db, table) if err != nil { @@ -745,10 +736,8 @@ func (s *Syncer) addJob(job *job) error { } s.jobWg.Wait() finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() - select { - case <-s.done: - case s.flushCheckpointChan <- NeedUpdate: - } + s.updateGlobalCheckpointFromWorkers(GetMaxPos) + s.flusher.addFlushRequest(NoNeedUpdate, true) return nil case ddl: s.jobWg.Wait() @@ -768,7 +757,7 @@ func (s *Syncer) addJob(job *job) error { addJobDurationHistogram.WithLabelValues(job.tp.String(), s.cfg.Name, s.queueBucketMapping[queueBucket]).Observe(time.Since(startTime).Seconds()) } - wait := s.checkWait(job) + wait := job.tp == ddl if wait { s.jobWg.Wait() s.c.reset() @@ -791,15 +780,9 @@ func (s *Syncer) addJob(job *job) error { } if s.checkpoint.CheckGlobalPoint() { - select { - case <-s.done: - case s.flushCheckpointChan <- NeedUpdate: - } + s.flusher.addFlushRequest(NeedUpdate, false) } else if wait { - select { - case <-s.done: - case s.flushCheckpointChan <- NoNeedUpdate: - } + s.flusher.addFlushRequest(NoNeedUpdate, true) } return nil @@ -1070,10 +1053,11 @@ func (s *Syncer) updateGlobalCheckpointFromWorkers(typ int) { } } -func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushType) { +func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flusher *flushHelper) { + defer flusher.close() for { select { - case flushType := <-flushChan: + case flushType := <-flusher.flushCheckpointChan: if flushType == NeedUpdate { s.updateGlobalCheckpointFromWorkers(GetMinPos) } @@ -1084,6 +1068,7 @@ func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flushChan chan FlushT return } } + flusher.finishRequest(flushType) case <-ctx.Done(): s.tctx.L().Info("async flush checkpoint routine exits", log.ShortError(ctx.Err())) return @@ -1148,9 +1133,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } s.wg.Add(1) + s.flusher = newFlusher(ctx) go func() { defer s.wg.Done() - s.asyncFlushCheckpoint(ctx, s.flushCheckpointChan) + s.asyncFlushCheckpoint(ctx, s.flusher) }() s.queueBucketMapping = append(s.queueBucketMapping, adminQueueName) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index bf4e928fa6..91e20d4285 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1042,7 +1042,6 @@ func (s *testSyncerSuite) TestCasuality(c *C) { syncer := NewSyncer(s.cfg, nil) syncer.jobs = []chan *job{make(chan *job, 1)} syncer.workerCheckpoints = makeWorkerCheckpointArray(1, s.cfg.Flavor) - syncer.flushCheckpointChan = make(chan FlushType, 16) syncer.queueBucketMapping = []string{"queue_0", adminQueueName} wg.Add(1) diff --git a/syncer/util.go b/syncer/util.go index 2d95673964..f61a6d950f 100644 --- a/syncer/util.go +++ b/syncer/util.go @@ -14,6 +14,7 @@ package syncer import ( + "context" "fmt" "os" "strconv" @@ -100,3 +101,46 @@ func getDBConfigFromEnv() config.DBConfig { Port: port, } } + +type flushHelper struct { + ctx context.Context + flushCheckpointChan chan FlushType + finished chan struct{} +} + +func newFlusher(ctx context.Context) *flushHelper { + return &flushHelper{ + ctx: ctx, + flushCheckpointChan: make(chan FlushType, 16), + finished: make(chan struct{}, 4), + } +} + +func (f *flushHelper) addFlushRequest(typ FlushType, wait bool) { + select { + case <-f.ctx.Done(): + return + case f.flushCheckpointChan <- typ: + } + if wait { + select { + case <-f.ctx.Done(): + case <-f.finished: + } + } +} + +func (f *flushHelper) finishRequest(typ FlushType) { + if typ == NeedUpdate { + return + } + select { + case <-f.ctx.Done(): + return + case f.finished <- struct{}{}: + } +} + +func (f *flushHelper) close() { + close(f.finished) +} From dd54e6b17ead30e7f6b7864efc5bb728b5b8d443 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 21 Apr 2020 10:29:39 +0800 Subject: [PATCH 09/16] fix ut --- syncer/syncer_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 91e20d4285..599108ece0 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1041,6 +1041,9 @@ func (s *testSyncerSuite) TestCasuality(c *C) { s.cfg.WorkerCount = 1 syncer := NewSyncer(s.cfg, nil) syncer.jobs = []chan *job{make(chan *job, 1)} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + syncer.flusher = newFlusher(ctx) syncer.workerCheckpoints = makeWorkerCheckpointArray(1, s.cfg.Flavor) syncer.queueBucketMapping = []string{"queue_0", adminQueueName} From 5ab61c3a725dbabad323a3151c88d597429fe01c Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 21 Apr 2020 11:53:48 +0800 Subject: [PATCH 10/16] fix --- syncer/syncer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 599108ece0..bac4477edb 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1050,6 +1050,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) { wg.Add(1) go func() { defer wg.Done() + syncer.flusher.finished <- struct{}{} job := <-syncer.jobs[0] c.Assert(job.tp, Equals, flush) syncer.jobWg.Done() From c1bef9fa9626eac62ce0509b7a558c2fbc379f97 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 6 May 2020 10:35:27 +0800 Subject: [PATCH 11/16] address comments and fix bugs --- syncer/syncer.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 7b06a43188..d599a4dba3 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -973,7 +973,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, count := s.cfg.Batch jobs := make([]*job, 0, count) tpCnt := make(map[opType]int64) - lastAddedXidPos := newBinlogPoint(binlog.NewLocation(s.cfg.Flavor), binlog.NewLocation(s.cfg.Flavor), nil, nil) + lastAddedXidPos := newBinlogPoint(binlog.NewLocation(s.cfg.Flavor), binlog.NewLocation(s.cfg.Flavor), nil, nil, false) clearF := func() { for i := 0; i < idx; i++ { @@ -1026,7 +1026,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, return } if sqlJob.tp == xid { - lastAddedXidPos.save(sqlJob.location.Clone(), nil) + lastAddedXidPos.save(sqlJob.currentLocation.Clone(), nil) continue } idx++ @@ -1066,16 +1066,16 @@ func (s *Syncer) updateGlobalCheckpointFromWorkers(typ int) { updatePos := minPos.Clone() for _, workerCheckpoint := range s.workerCheckpoints { pos := workerCheckpoint.MySQLLocation() - if binlog.CompareLocation(pos, minPos) > 0 { - shouldUpdate := binlog.CompareLocation(updatePos, minPos) == 0 || - (typ == GetMinPos && binlog.CompareLocation(pos, updatePos) < 0) || - (typ == GetMaxPos && binlog.CompareLocation(pos, updatePos) > 0) + if binlog.CompareLocation(pos, minPos, false) > 0 { + shouldUpdate := binlog.CompareLocation(updatePos, minPos, false) == 0 || + (typ == GetMinPos && binlog.CompareLocation(pos, updatePos, false) < 0) || + (typ == GetMaxPos && binlog.CompareLocation(pos, updatePos, false) > 0) if shouldUpdate { updatePos = pos } } } - if binlog.CompareLocation(updatePos, minPos) > 0 { + if binlog.CompareLocation(updatePos, minPos, false) > 0 { s.saveGlobalPoint(updatePos) } } From 7301b669148eca24a1cdd03aaafceecf09ade1f5 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 6 May 2020 19:53:08 +0800 Subject: [PATCH 12/16] refine syncer error return --- pkg/atomic2/atomic.go | 33 ++++++++++++++++++++++++++++ pkg/atomic2/atomic_test.go | 44 ++++++++++++++++++++++++++++++++++++++ syncer/optimist.go | 4 ++-- syncer/syncer.go | 25 +++++++++++----------- 4 files changed, 92 insertions(+), 14 deletions(-) create mode 100644 pkg/atomic2/atomic.go create mode 100644 pkg/atomic2/atomic_test.go diff --git a/pkg/atomic2/atomic.go b/pkg/atomic2/atomic.go new file mode 100644 index 0000000000..7885856093 --- /dev/null +++ b/pkg/atomic2/atomic.go @@ -0,0 +1,33 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package atomic2 + +import ( + "sync/atomic" + "unsafe" +) + +type AtomicError struct { + p unsafe.Pointer +} + +// Get returns error +func (e AtomicError) Get() error { + return *(*error)(atomic.LoadPointer(&e.p)) +} + +// Set sets error to AtomicError +func (e AtomicError) Set(err error) { + atomic.StorePointer(&e.p, unsafe.Pointer(&err)) +} diff --git a/pkg/atomic2/atomic_test.go b/pkg/atomic2/atomic_test.go new file mode 100644 index 0000000000..3df8395630 --- /dev/null +++ b/pkg/atomic2/atomic_test.go @@ -0,0 +1,44 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package atomic2 + +import ( + "errors" + "testing" + + "github.com/pingcap/check" +) + +func TestT(t *testing.T) { + check.TestingT(t) +} + +var _ = check.Suite(&testAtomicSuite{}) + +type testAtomicSuite struct{} + +func (t *testAtomicSuite) TestAtomicError(c *check.C) { + var ( + e AtomicError + err = errors.New("test") + ) + err2 := e.Get() + c.Assert(err, check.Equals, nil) + e.Set(err) + err2 = e.Get() + c.Assert(err, check.DeepEquals, err2) + e.Set(nil) + err2 = e.Get() + c.Assert(err, check.Equals, nil) +} diff --git a/syncer/optimist.go b/syncer/optimist.go index 8b5b17f8a1..9bcd24aeae 100644 --- a/syncer/optimist.go +++ b/syncer/optimist.go @@ -214,8 +214,8 @@ func (s *Syncer) handleQueryEventOptimistic( return err } - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + if err := s.execError.Get(); err != nil { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } for _, table := range onlineDDLTableNames { diff --git a/syncer/syncer.go b/syncer/syncer.go index d599a4dba3..748e542270 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + "github.com/pingcap/dm/pkg/atomic2" "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/binlog/common" "github.com/pingcap/dm/pkg/conn" @@ -178,7 +179,7 @@ type Syncer struct { // record process error rather than log.Fatal runFatalChan chan *pb.ProcessError // record whether error occurred when execute SQLs - execErrorDetected sync2.AtomicBool + execError atomic2.AtomicError execErrors struct { sync.Mutex @@ -466,7 +467,7 @@ func (s *Syncer) reset() { s.newJobChans(s.cfg.WorkerCount + 1) s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount, s.cfg.Flavor) - s.execErrorDetected.Set(false) + s.execError.Set(nil) s.resetExecErrors() switch s.cfg.ShardMode { @@ -568,7 +569,7 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { close(runFatalChan) // Run returned, all potential fatal sent to s.runFatalChan wg.Wait() // wait for receive all fatal from s.runFatalChan - if err != nil { + if err != nil && !utils.IsContextCanceledError(err) { syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name).Inc() errs = append(errs, unit.NewProcessError(err)) } @@ -580,7 +581,7 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { default: } - if len(errs) != 0 { + if len(errs) != 0 || err != nil { // pause because of error occurred s.Pause() } @@ -844,8 +845,8 @@ func (s *Syncer) resetShardingGroup(schema, table string) { // // we may need to refactor the concurrency model to make the work-flow more clearer later func (s *Syncer) flushCheckPoints() error { - if s.execErrorDetected.Get() { - s.tctx.L().Warn("error detected when executing SQL job, skip flush checkpoint", zap.Stringer("checkpoint", s.checkpoint)) + if err := s.execError.Get(); err != nil { + s.tctx.L().Warn("error detected when executing SQL job, skip flush checkpoint", zap.Stringer("checkpoint", s.checkpoint), zap.Error(err)) return nil } @@ -955,7 +956,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, } s.jobWg.Done() if err != nil { - s.execErrorDetected.Set(true) + s.execError.Set(err) if !utils.IsContextCanceledError(err) { s.runFatalChan <- unit.NewProcessError(err) } @@ -989,7 +990,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, } fatalF := func(err error) { - s.execErrorDetected.Set(true) + s.execError.Set(err) if !utils.IsContextCanceledError(err) { s.runFatalChan <- unit.NewProcessError(err) } @@ -1870,8 +1871,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // when add ddl job, will execute ddl and then flush checkpoint. // if execute ddl failed, the execErrorDetected will be true. - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + if err := s.execError.Get(); err != nil { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("location", ec.currentLocation)) @@ -2072,8 +2073,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e return err } - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + if err := s.execError.Get(); err != nil { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } if len(onlineDDLTableNames) > 0 { From 4bcc4218f73dd23e0c9babb5247ea793be2cb43b Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 6 May 2020 19:56:35 +0800 Subject: [PATCH 13/16] fix check --- pkg/atomic2/atomic.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/atomic2/atomic.go b/pkg/atomic2/atomic.go index 7885856093..99005b5c9d 100644 --- a/pkg/atomic2/atomic.go +++ b/pkg/atomic2/atomic.go @@ -18,6 +18,7 @@ import ( "unsafe" ) +// AtomicError implements atomic error method type AtomicError struct { p unsafe.Pointer } From 9c9b42ac1b4769b9f5b4a377b61c029018edcea9 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 6 May 2020 20:28:21 +0800 Subject: [PATCH 14/16] fix atomic error bug --- pkg/atomic2/atomic.go | 10 +++++++--- pkg/atomic2/atomic_test.go | 6 ++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/atomic2/atomic.go b/pkg/atomic2/atomic.go index 99005b5c9d..e24cb616e0 100644 --- a/pkg/atomic2/atomic.go +++ b/pkg/atomic2/atomic.go @@ -24,11 +24,15 @@ type AtomicError struct { } // Get returns error -func (e AtomicError) Get() error { - return *(*error)(atomic.LoadPointer(&e.p)) +func (e *AtomicError) Get() error { + p := atomic.LoadPointer(&e.p) + if p == nil { + return nil + } + return *(*error)(p) } // Set sets error to AtomicError -func (e AtomicError) Set(err error) { +func (e *AtomicError) Set(err error) { atomic.StorePointer(&e.p, unsafe.Pointer(&err)) } diff --git a/pkg/atomic2/atomic_test.go b/pkg/atomic2/atomic_test.go index 3df8395630..05e52e31bc 100644 --- a/pkg/atomic2/atomic_test.go +++ b/pkg/atomic2/atomic_test.go @@ -35,10 +35,16 @@ func (t *testAtomicSuite) TestAtomicError(c *check.C) { ) err2 := e.Get() c.Assert(err, check.Equals, nil) + e.Set(err) err2 = e.Get() c.Assert(err, check.DeepEquals, err2) + e.Set(nil) err2 = e.Get() c.Assert(err, check.Equals, nil) + + err = errors.New("test2") + err2 = e.Get() + c.Assert(err2.Error(), check.Equals, "test") } From eca9c28f05f338354d488b644fecd01890ec9fed Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 6 May 2020 20:52:38 +0800 Subject: [PATCH 15/16] fix ut --- pkg/atomic2/atomic_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/atomic2/atomic_test.go b/pkg/atomic2/atomic_test.go index 05e52e31bc..a49fd2fd1a 100644 --- a/pkg/atomic2/atomic_test.go +++ b/pkg/atomic2/atomic_test.go @@ -34,15 +34,15 @@ func (t *testAtomicSuite) TestAtomicError(c *check.C) { err = errors.New("test") ) err2 := e.Get() - c.Assert(err, check.Equals, nil) + c.Assert(err2, check.Equals, nil) e.Set(err) err2 = e.Get() - c.Assert(err, check.DeepEquals, err2) + c.Assert(err2, check.DeepEquals, err) e.Set(nil) err2 = e.Get() - c.Assert(err, check.Equals, nil) + c.Assert(err2, check.Equals, nil) err = errors.New("test2") err2 = e.Get() From 3373298c6ed1d20a7c1b2155066127bd511ff7d9 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 6 May 2020 20:53:26 +0800 Subject: [PATCH 16/16] fix ut again --- pkg/atomic2/atomic_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/atomic2/atomic_test.go b/pkg/atomic2/atomic_test.go index a49fd2fd1a..b916d01ff3 100644 --- a/pkg/atomic2/atomic_test.go +++ b/pkg/atomic2/atomic_test.go @@ -40,11 +40,11 @@ func (t *testAtomicSuite) TestAtomicError(c *check.C) { err2 = e.Get() c.Assert(err2, check.DeepEquals, err) - e.Set(nil) - err2 = e.Get() - c.Assert(err2, check.Equals, nil) - err = errors.New("test2") err2 = e.Get() c.Assert(err2.Error(), check.Equals, "test") + + e.Set(nil) + err2 = e.Get() + c.Assert(err2, check.Equals, nil) }