diff --git a/pkg/atomic2/atomic.go b/pkg/atomic2/atomic.go new file mode 100644 index 0000000000..e24cb616e0 --- /dev/null +++ b/pkg/atomic2/atomic.go @@ -0,0 +1,38 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package atomic2 + +import ( + "sync/atomic" + "unsafe" +) + +// AtomicError implements atomic error method +type AtomicError struct { + p unsafe.Pointer +} + +// Get returns error +func (e *AtomicError) Get() error { + p := atomic.LoadPointer(&e.p) + if p == nil { + return nil + } + return *(*error)(p) +} + +// Set sets error to AtomicError +func (e *AtomicError) Set(err error) { + atomic.StorePointer(&e.p, unsafe.Pointer(&err)) +} diff --git a/pkg/atomic2/atomic_test.go b/pkg/atomic2/atomic_test.go new file mode 100644 index 0000000000..b916d01ff3 --- /dev/null +++ b/pkg/atomic2/atomic_test.go @@ -0,0 +1,50 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package atomic2 + +import ( + "errors" + "testing" + + "github.com/pingcap/check" +) + +func TestT(t *testing.T) { + check.TestingT(t) +} + +var _ = check.Suite(&testAtomicSuite{}) + +type testAtomicSuite struct{} + +func (t *testAtomicSuite) TestAtomicError(c *check.C) { + var ( + e AtomicError + err = errors.New("test") + ) + err2 := e.Get() + c.Assert(err2, check.Equals, nil) + + e.Set(err) + err2 = e.Get() + c.Assert(err2, check.DeepEquals, err) + + err = errors.New("test2") + err2 = e.Get() + c.Assert(err2.Error(), check.Equals, "test") + + e.Set(nil) + err2 = e.Get() + c.Assert(err2, check.Equals, nil) +} diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 3734fc2c43..409b6e5385 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -433,8 +433,8 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(location binlog.Location) { // FlushPointsExcept implements CheckPoint.FlushPointsExcept func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error { - cp.RLock() - defer cp.RUnlock() + cp.Lock() + defer cp.Unlock() // convert slice to map excepts := make(map[string]map[string]struct{}) diff --git a/syncer/optimist.go b/syncer/optimist.go index 8b5b17f8a1..9bcd24aeae 100644 --- a/syncer/optimist.go +++ b/syncer/optimist.go @@ -214,8 +214,8 @@ func (s *Syncer) handleQueryEventOptimistic( return err } - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + if err := s.execError.Get(); err != nil { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } for _, table := range onlineDDLTableNames { diff --git a/syncer/syncer.go b/syncer/syncer.go index 14e5c0549e..748e542270 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + "github.com/pingcap/dm/pkg/atomic2" "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/binlog/common" "github.com/pingcap/dm/pkg/conn" @@ -87,6 +88,26 @@ const ( LocalBinlog ) +// FlushType represents flush checkpoint type +type FlushType uint8 + +// flush checkpoint type +const ( + // For ddl job the global checkpoint will be updated in addJobFunc, so we needn't update and flush directly + NoNeedUpdate FlushType = iota + 1 + // If global checkpoint hasn't been updated for more than 30s, or receive a flush job, + // we send a NeedUpdate request to async flush checkpoint go routine. + // It will update global checkpoint to minPos of workers and then flush checkpoint + NeedUpdate +) + +const ( + // GetMinPos gets min pos from checkpoints + GetMinPos int = iota + 1 + // GetMaxPos gets max pos from checkpoints + GetMaxPos +) + // Syncer can sync your MySQL data to another MySQL database. type Syncer struct { sync.RWMutex @@ -122,6 +143,9 @@ type Syncer struct { jobsChanLock sync.Mutex queueBucketMapping []string + workerCheckpoints []*binlogPoint + flusher *flushHelper + c *causality tableRouter *router.Table @@ -155,7 +179,7 @@ type Syncer struct { // record process error rather than log.Fatal runFatalChan chan *pb.ProcessError // record whether error occurred when execute SQLs - execErrorDetected sync2.AtomicBool + execError atomic2.AtomicError execErrors struct { sync.Mutex @@ -441,8 +465,9 @@ func (s *Syncer) reset() { } // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) + s.workerCheckpoints = makeWorkerCheckpointArray(s.cfg.WorkerCount, s.cfg.Flavor) - s.execErrorDetected.Set(false) + s.execError.Set(nil) s.resetExecErrors() switch s.cfg.ShardMode { @@ -544,7 +569,7 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { close(runFatalChan) // Run returned, all potential fatal sent to s.runFatalChan wg.Wait() // wait for receive all fatal from s.runFatalChan - if err != nil { + if err != nil && !utils.IsContextCanceledError(err) { syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name).Inc() errs = append(errs, unit.NewProcessError(err)) } @@ -556,7 +581,7 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { default: } - if len(errs) != 0 { + if len(errs) != 0 || err != nil { // pause because of error occurred s.Pause() } @@ -676,18 +701,6 @@ func (s *Syncer) addCount(isFinished bool, queueBucket string, tp opType, n int6 } } -func (s *Syncer) checkWait(job *job) bool { - if job.tp == ddl { - return true - } - - if s.checkpoint.CheckGlobalPoint() { - return true - } - - return false -} - func (s *Syncer) saveTablePoint(db, table string, location binlog.Location) { ti, err := s.schemaTracker.GetTable(db, table) if err != nil { @@ -701,12 +714,15 @@ func (s *Syncer) saveTablePoint(db, table string, location binlog.Location) { } func (s *Syncer) addJob(job *job) error { - var ( - queueBucket int - ) + var queueBucket int switch job.tp { case xid: - s.saveGlobalPoint(job.location) + if s.cfg.ShardMode == config.ShardPessimistic { + job.location = s.sgk.AdjustGlobalLocation(job.location) + } + for i := 0; i < s.cfg.WorkerCount; i++ { + s.jobs[i] <- job + } return nil case flush: addedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() @@ -720,7 +736,9 @@ func (s *Syncer) addJob(job *job) error { } s.jobWg.Wait() finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() - return s.flushCheckPoints() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) + s.flusher.addFlushRequest(NoNeedUpdate, true) + return nil case ddl: s.jobWg.Wait() addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Inc() @@ -739,7 +757,7 @@ func (s *Syncer) addJob(job *job) error { addJobDurationHistogram.WithLabelValues(job.tp.String(), s.cfg.Name, s.queueBucketMapping[queueBucket]).Observe(time.Since(startTime).Seconds()) } - wait := s.checkWait(job) + wait := job.tp == ddl if wait { s.jobWg.Wait() s.c.reset() @@ -782,15 +800,16 @@ func (s *Syncer) addJob(job *job) error { } } - if wait { - // interrupted after save checkpoint and before flush checkpoint. + if s.checkpoint.CheckGlobalPoint() { + s.flusher.addFlushRequest(NeedUpdate, false) + } else if wait { failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { err := handleFlushCheckpointStage(4, val.(int), "before flush checkpoint") if err != nil { failpoint.Return(err) } }) - return s.flushCheckPoints() + s.flusher.addFlushRequest(NoNeedUpdate, true) } return nil @@ -826,8 +845,8 @@ func (s *Syncer) resetShardingGroup(schema, table string) { // // we may need to refactor the concurrency model to make the work-flow more clearer later func (s *Syncer) flushCheckPoints() error { - if s.execErrorDetected.Get() { - s.tctx.L().Warn("error detected when executing SQL job, skip flush checkpoint", zap.Stringer("checkpoint", s.checkpoint)) + if err := s.execError.Get(); err != nil { + s.tctx.L().Warn("error detected when executing SQL job, skip flush checkpoint", zap.Stringer("checkpoint", s.checkpoint), zap.Error(err)) return nil } @@ -937,7 +956,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, } s.jobWg.Done() if err != nil { - s.execErrorDetected.Set(true) + s.execError.Set(err) if !utils.IsContextCanceledError(err) { s.runFatalChan <- unit.NewProcessError(err) } @@ -947,13 +966,15 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, } } -func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jobChan chan *job) { +func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, + jobChan chan *job, workerCheckpoint *binlogPoint) { defer s.wg.Done() idx := 0 count := s.cfg.Batch jobs := make([]*job, 0, count) tpCnt := make(map[opType]int64) + lastAddedXidPos := newBinlogPoint(binlog.NewLocation(s.cfg.Flavor), binlog.NewLocation(s.cfg.Flavor), nil, nil, false) clearF := func() { for i := 0; i < idx; i++ { @@ -969,7 +990,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo } fatalF := func(err error) { - s.execErrorDetected.Set(true) + s.execError.Set(err) if !utils.IsContextCanceledError(err) { s.runFatalChan <- unit.NewProcessError(err) } @@ -978,6 +999,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo executeSQLs := func() error { if len(jobs) == 0 { + workerCheckpoint.save(lastAddedXidPos.MySQLLocation(), nil) return nil } queries := make([]string, 0, len(jobs)) @@ -990,6 +1012,8 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo if err != nil { errCtx := &ExecErrorContext{err, jobs[affected].currentLocation.Clone(), fmt.Sprintf("%v", jobs)} s.appendExecErrors(errCtx) + } else { + workerCheckpoint.save(lastAddedXidPos.MySQLLocation(), nil) } return err } @@ -1002,6 +1026,10 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo if !ok { return } + if sqlJob.tp == xid { + lastAddedXidPos.save(sqlJob.currentLocation.Clone(), nil) + continue + } idx++ if sqlJob.tp != flush && len(sqlJob.sql) > 0 { @@ -1026,9 +1054,54 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo continue } clearF() + } else { + workerCheckpoint.save(lastAddedXidPos.MySQLLocation(), nil) + time.Sleep(waitTime) + } + } + } +} + +func (s *Syncer) updateGlobalCheckpointFromWorkers(typ int) { + minPos := binlog.NewLocation(s.cfg.Flavor) + updatePos := minPos.Clone() + for _, workerCheckpoint := range s.workerCheckpoints { + pos := workerCheckpoint.MySQLLocation() + if binlog.CompareLocation(pos, minPos, false) > 0 { + shouldUpdate := binlog.CompareLocation(updatePos, minPos, false) == 0 || + (typ == GetMinPos && binlog.CompareLocation(pos, updatePos, false) < 0) || + (typ == GetMaxPos && binlog.CompareLocation(pos, updatePos, false) > 0) + if shouldUpdate { + updatePos = pos } } } + if binlog.CompareLocation(updatePos, minPos, false) > 0 { + s.saveGlobalPoint(updatePos) + } +} + +func (s *Syncer) asyncFlushCheckpoint(ctx context.Context, flusher *flushHelper) { + defer flusher.close() + for { + select { + case flushType := <-flusher.flushCheckpointChan: + if flushType == NeedUpdate { + s.updateGlobalCheckpointFromWorkers(GetMinPos) + } + err := s.flushCheckPoints() + if err != nil { + if !utils.IsContextCanceledError(err) { + s.runFatalChan <- unit.NewProcessError(err) + return + } + } + flusher.finishRequest(flushType) + case <-ctx.Done(): + s.tctx.L().Info("async flush checkpoint routine exits", log.ShortError(ctx.Err())) + return + } + } } // Run starts running for sync, we should guarantee it can rerun when paused. @@ -1082,11 +1155,18 @@ func (s *Syncer) Run(ctx context.Context) (err error) { go func(i int, n string) { ctx2, cancel := context.WithCancel(ctx) ctctx := s.tctx.WithContext(ctx2) - s.sync(ctctx, n, s.toDBConns[i], s.jobs[i]) + s.sync(ctctx, n, s.toDBConns[i], s.jobs[i], s.workerCheckpoints[i]) cancel() }(i, name) } + s.wg.Add(1) + s.flusher = newFlusher(ctx) + go func() { + defer s.wg.Done() + s.asyncFlushCheckpoint(ctx, s.flusher) + }() + s.queueBucketMapping = append(s.queueBucketMapping, adminQueueName) s.wg.Add(1) go func() { @@ -1110,6 +1190,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) if err2 := s.flushCheckPoints(); err2 != nil { s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2)) } @@ -1790,8 +1871,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // when add ddl job, will execute ddl and then flush checkpoint. // if execute ddl failed, the execErrorDetected will be true. - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + if err := s.execError.Get(); err != nil { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("location", ec.currentLocation)) @@ -1941,6 +2022,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e if shardOp.Exec { failpoint.Inject("ShardSyncedExecutionExit", func() { s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ShardSyncedExecutionExit")) + s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) s.flushCheckPoints() utils.OsExit(1) }) @@ -1950,6 +2033,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // exit in the first round sequence sharding DDL only if group.meta.ActiveIdx() == 1 { s.tctx.L().Warn("exit triggered", zap.String("failpoint", "SequenceShardSyncedExecutionExit")) + s.jobWg.Wait() + s.updateGlobalCheckpointFromWorkers(GetMaxPos) s.flushCheckPoints() utils.OsExit(1) } @@ -1988,8 +2073,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e return err } - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + if err := s.execError.Get(); err != nil { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } if len(onlineDDLTableNames) > 0 { @@ -2361,8 +2446,9 @@ func (s *Syncer) Pause() { s.tctx.L().Warn("try to pause, but already closed") return } - + s.Lock() s.stopSync() + s.Unlock() } // Resume resumes the paused process @@ -2599,3 +2685,11 @@ func (s *Syncer) ShardDDLInfo() *pessimism.Info { func (s *Syncer) ShardDDLOperation() *pessimism.Operation { return s.pessimist.PendingOperation() } + +func makeWorkerCheckpointArray(workerCount int, flavor string) []*binlogPoint { + workerCheckpoints := make([]*binlogPoint, 0, workerCount) + for i := 0; i < workerCount; i++ { + workerCheckpoints = append(workerCheckpoints, newBinlogPoint(binlog.NewLocation(flavor), binlog.NewLocation(flavor), nil, nil, false)) + } + return workerCheckpoints +} diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 5c3761e975..de97df75a7 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1041,11 +1041,16 @@ func (s *testSyncerSuite) TestCasuality(c *C) { s.cfg.WorkerCount = 1 syncer := NewSyncer(s.cfg, nil) syncer.jobs = []chan *job{make(chan *job, 1)} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + syncer.flusher = newFlusher(ctx) + syncer.workerCheckpoints = makeWorkerCheckpointArray(1, s.cfg.Flavor) syncer.queueBucketMapping = []string{"queue_0", adminQueueName} wg.Add(1) go func() { defer wg.Done() + syncer.flusher.finished <- struct{}{} job := <-syncer.jobs[0] c.Assert(job.tp, Equals, flush) syncer.jobWg.Done() diff --git a/syncer/util.go b/syncer/util.go index c264e71e3f..74eb28788c 100644 --- a/syncer/util.go +++ b/syncer/util.go @@ -14,6 +14,7 @@ package syncer import ( + "context" "fmt" "os" "strconv" @@ -103,6 +104,49 @@ func getDBConfigFromEnv() config.DBConfig { } } +type flushHelper struct { + ctx context.Context + flushCheckpointChan chan FlushType + finished chan struct{} +} + +func newFlusher(ctx context.Context) *flushHelper { + return &flushHelper{ + ctx: ctx, + flushCheckpointChan: make(chan FlushType, 16), + finished: make(chan struct{}, 4), + } +} + +func (f *flushHelper) addFlushRequest(typ FlushType, wait bool) { + select { + case <-f.ctx.Done(): + return + case f.flushCheckpointChan <- typ: + } + if wait { + select { + case <-f.ctx.Done(): + case <-f.finished: + } + } +} + +func (f *flushHelper) finishRequest(typ FlushType) { + if typ == NeedUpdate { + return + } + select { + case <-f.ctx.Done(): + return + case f.finished <- struct{}{}: + } +} + +func (f *flushHelper) close() { + close(f.finished) +} + // record source tbls record the tables that need to flush checkpoints func recordSourceTbls(sourceTbls map[string]map[string]struct{}, stmt ast.StmtNode, table *filter.Table) { schema, name := table.Schema, table.Name diff --git a/tests/dmctl_basic/run.sh b/tests/dmctl_basic/run.sh index 5c5827e32d..a3c97140fd 100755 --- a/tests/dmctl_basic/run.sh +++ b/tests/dmctl_basic/run.sh @@ -195,29 +195,28 @@ function run() { update_master_config_success $dm_master_conf cmp $dm_master_conf $cur/conf/dm-master.toml -# TODO: The ddl sharding part for DM-HA still has some problem. This should be uncommented when it's fixed. -# run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 -# set +e -# i=0 -# while [ $i -lt 10 ] -# do -# show_ddl_locks_with_locks "$TASK_NAME-\`dmctl\`.\`t_target\`" "ALTER TABLE \`dmctl\`.\`t_target\` DROP COLUMN \`d\`" -# ((i++)) -# if [ "$?" != 0 ]; then -# echo "wait 1s and check for the $i-th time" -# sleep 1 -# else -# break -# fi -# done -# set -e -# if [ $i -ge 10 ]; then -# echo "show_ddl_locks_with_locks check timeout" -# exit 1 -# fi -# run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 -# check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 10 -# show_ddl_locks_no_locks $TASK_NAME + run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + set +e + i=0 + while [ $i -lt 10 ] + do + show_ddl_locks_with_locks "$TASK_NAME-\`dmctl\`.\`t_target\`" "ALTER TABLE \`dmctl\`.\`t_target\` DROP COLUMN \`d\`" + ((i++)) + if [ "$?" != 0 ]; then + echo "wait 1s and check for the $i-th time" + sleep 1 + else + break + fi + done + set -e + if [ $i -ge 10 ]; then + echo "show_ddl_locks_with_locks check timeout" + exit 1 + fi + run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 10 + show_ddl_locks_no_locks $TASK_NAME # sleep 1s to ensure syncer unit has flushed global checkpoint and updates # updated ActiveRelayLog @@ -225,11 +224,21 @@ function run() { server_uuid=$(tail -n 1 $WORK_DIR/worker1/relay_log/server-uuid.index) run_sql "show binary logs\G" $MYSQL_PORT1 $MYSQL_PASSWORD1 max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE"| tail -n 1 | awk -F":" '{print $NF}') + echo "max_binlog_name: $max_binlog_name" binlog_count=$(grep Log_name "$SQL_RESULT_FILE" | wc -l) + echo "binlog_count: $binlog_count" relay_log_count=$(($(ls $WORK_DIR/worker1/relay_log/$server_uuid | wc -l) - 1)) + echo "relay_log_count: $relay_log_count" [ "$binlog_count" -eq "$relay_log_count" ] + + # use ddl to flush checkpoint + run_sql "alter table \`dmctl\`.\`t_1\` add column d varchar(10);alter table \`dmctl\`.\`t_2\` add column d varchar(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "alter table \`dmctl\`.\`t_1\` add column d varchar(10);alter table \`dmctl\`.\`t_2\` add column d varchar(10);" $MYSQL_PORT2 $MYSQL_PASSWORD2 + sleep 3 + purge_relay_success $max_binlog_name $SOURCE_ID1 new_relay_log_count=$(($(ls $WORK_DIR/worker1/relay_log/$server_uuid | wc -l) - 1)) + echo "new_relay_log_count: $new_relay_log_count" [ "$new_relay_log_count" -eq 1 ] } diff --git a/tests/retry_cancel/run.sh b/tests/retry_cancel/run.sh index 03c32867e8..2f472cde04 100755 --- a/tests/retry_cancel/run.sh +++ b/tests/retry_cancel/run.sh @@ -103,7 +103,10 @@ function run() { # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - + # execute ddl to make sure sync checkpoint is flushed + run_sql "ALTER TABLE \`retry_cancel\`.\`t1\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "ALTER TABLE \`retry_cancel\`.\`t2\` ADD COLUMN info VARCHAR(10);" $MYSQL_PORT2 $MYSQL_PASSWORD2 + sleep 3 # ---------- test for incremental replication ---------- # stop DM-worker, then enable failponits kill_dm_worker