From 79628d0ef57c73cf81689a533c9f292a0ca48792 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 5 Jun 2021 18:02:38 +0800 Subject: [PATCH 1/7] add a test to reproduce BUG --- syncer/syncer.go | 41 ++++++++++++++++++++ tests/all_mode/data/db1.increment2.sql | 3 ++ tests/all_mode/data/db2.increment2.sql | 3 ++ tests/all_mode/run.sh | 52 ++++++++++++++++++++++++-- 4 files changed, 96 insertions(+), 3 deletions(-) create mode 100644 tests/all_mode/data/db1.increment2.sql create mode 100644 tests/all_mode/data/db2.increment2.sql diff --git a/syncer/syncer.go b/syncer/syncer.go index 3924d20342..0eadd7088e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -742,7 +742,34 @@ func (s *Syncer) saveTablePoint(db, table string, location binlog.Location) { s.checkpoint.SaveTablePoint(db, table, location, ti) } +// only used in tests +var ( + lastPos mysql.Position + lastPosNum int + waitJobsDone bool + failExecuteSQL bool + failOnce atomic.Bool +) + func (s *Syncer) addJob(job *job) error { + failpoint.Inject("countJobFromOneEvent", func() { + if job.currentLocation.Position.Compare(lastPos) == 0 { + lastPosNum += 1 + } else { + lastPos = job.currentLocation.Position + lastPosNum = 1 + } + // trigger a flush after see one job + if lastPosNum == 1 { + waitJobsDone = true + s.tctx.L().Info("meet the first job of an event", zap.Any("binlog position", lastPos)) + } + // mock a execution error after see two jobs. + if lastPosNum == 2 { + failExecuteSQL = true + s.tctx.L().Info("meet the second job of an event", zap.Any("binlog position", lastPos)) + } + }) var queueBucket int switch job.tp { case xid: @@ -781,6 +808,13 @@ func (s *Syncer) addJob(job *job) error { // nolint:ifshort wait := s.checkWait(job) + failpoint.Inject("flushFirstJobOfEvent", func() { + if waitJobsDone { + s.tctx.L().Info("trigger flushFirstJobOfEvent") + waitJobsDone = false + wait = true + } + }) if wait { s.jobWg.Wait() s.c.reset() @@ -1049,6 +1083,13 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo return 0, nil } + failpoint.Inject("failSecondJobOfEvent", func() { + if failExecuteSQL && failOnce.CAS(false, true) { + s.tctx.L().Info("trigger failSecondJobOfEvent") + failpoint.Return(0, errors.New("failSecondJobOfEvent")) + } + }) + select { case <-tctx.Ctx.Done(): // do not execute queries anymore, because they should be failed with a done context. diff --git a/tests/all_mode/data/db1.increment2.sql b/tests/all_mode/data/db1.increment2.sql new file mode 100644 index 0000000000..87fc44acad --- /dev/null +++ b/tests/all_mode/data/db1.increment2.sql @@ -0,0 +1,3 @@ +use all_mode; +insert into t1 (id, name) values (10, '10'), (20, '20'); +insert into t1 (id, name) values (30, '30'); diff --git a/tests/all_mode/data/db2.increment2.sql b/tests/all_mode/data/db2.increment2.sql new file mode 100644 index 0000000000..1d579a06d8 --- /dev/null +++ b/tests/all_mode/data/db2.increment2.sql @@ -0,0 +1,3 @@ +use all_mode; +insert into t2 (id, name) values (10, '10'), (20, '20'); +insert into t2 (id, name) values (30, '30'); diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index e01a9a7118..0b386e6ea1 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -173,10 +173,56 @@ function test_stop_task_before_checkpoint() { export GO_FAILPOINTS='' } +function test_fail_job_between_event() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_contains 'Query OK, 3 rows affected' + + # start DM worker and master + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + check_metric $MASTER_PORT 'start_leader_counter' 3 0 2 + + inject_points=( + "github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")" + "github.com/pingcap/dm/syncer/countJobFromOneEvent=return()" + "github.com/pingcap/dm/syncer/flushFirstJobOfEvent=return()" + "github.com/pingcap/dm/syncer/failSecondJobOfEvent=return()" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # operate mysql config to worker + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "s/enable-gtid: true/enable-gtid: false/g" $WORK_DIR/source1.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" + + run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_log_contain_with_retry "failSecondJobOfEvent" $WORK_DIR/worker1/log/dm-worker.log + check_log_contain_with_retry "failSecondJobOfEvent" $WORK_DIR/worker2/log/dm-worker.log + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"result\": true" 3 + read -p 666 +} + function run() { - run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" - run_sql_source1 "SET @@global.time_zone = '+01:00';" - run_sql_source2 "SET @@global.time_zone = '+02:00';" +# run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" +# run_sql_source1 "SET @@global.time_zone = '+01:00';" +# run_sql_source2 "SET @@global.time_zone = '+02:00';" + test_fail_job_between_event + test_session_config test_query_timeout From 07399b73cbf62b0e5487cf2037e311beb1dcad4f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 5 Jun 2021 18:09:48 +0800 Subject: [PATCH 2/7] fix CI --- syncer/syncer.go | 12 ++++++------ tests/all_mode/run.sh | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 0eadd7088e..d3a74c662f 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -742,19 +742,19 @@ func (s *Syncer) saveTablePoint(db, table string, location binlog.Location) { s.checkpoint.SaveTablePoint(db, table, location, ti) } -// only used in tests +// only used in tests. var ( - lastPos mysql.Position - lastPosNum int - waitJobsDone bool + lastPos mysql.Position + lastPosNum int + waitJobsDone bool failExecuteSQL bool - failOnce atomic.Bool + failOnce atomic.Bool ) func (s *Syncer) addJob(job *job) error { failpoint.Inject("countJobFromOneEvent", func() { if job.currentLocation.Position.Compare(lastPos) == 0 { - lastPosNum += 1 + lastPosNum++ } else { lastPos = job.currentLocation.Position lastPosNum = 1 diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 0b386e6ea1..83da653d84 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -174,17 +174,17 @@ function test_stop_task_before_checkpoint() { } function test_fail_job_between_event() { - run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 check_contains 'Query OK, 3 rows affected' - # start DM worker and master + # start DM worker and master run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT check_metric $MASTER_PORT 'start_leader_counter' 3 0 2 - inject_points=( + inject_points=( "github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")" "github.com/pingcap/dm/syncer/countJobFromOneEvent=return()" "github.com/pingcap/dm/syncer/flushFirstJobOfEvent=return()" @@ -218,9 +218,9 @@ function test_fail_job_between_event() { } function run() { -# run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" -# run_sql_source1 "SET @@global.time_zone = '+01:00';" -# run_sql_source2 "SET @@global.time_zone = '+02:00';" + # run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" + # run_sql_source1 "SET @@global.time_zone = '+01:00';" + # run_sql_source2 "SET @@global.time_zone = '+02:00';" test_fail_job_between_event test_session_config From 39c837f4aa4e8005873a33eb1788fa958c6fb4b5 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 9 Jun 2021 18:32:30 +0800 Subject: [PATCH 3/7] implement --- syncer/checkpoint.go | 30 ++++++++++++------------------ syncer/checkpoint_test.go | 32 ++++++++++++++++---------------- syncer/syncer.go | 4 ++-- tests/all_mode/run.sh | 8 ++++---- 4 files changed, 34 insertions(+), 40 deletions(-) diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index e2ff9ec75d..f529393dc5 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -198,8 +198,8 @@ type CheckPoint interface { // DeleteSchemaPoint deletes checkpoint for specified schema DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error - // IsNewerTablePoint checks whether job's checkpoint is newer than previous saved checkpoint - IsNewerTablePoint(sourceSchema, sourceTable string, point binlog.Location, gte bool) bool + // IsOlderThanTablePoint checks whether job's checkpoint is newer than previous saved checkpoint + IsOlderThanTablePoint(sourceSchema, sourceTable string, point binlog.Location) bool // SaveGlobalPoint saves the global binlog stream's checkpoint // corresponding to Meta.Save @@ -443,34 +443,28 @@ func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSche return nil } -// IsNewerTablePoint implements CheckPoint.IsNewerTablePoint. -// gte means greater than or equal, gte should judge by EnableGTID and the event type -// - when enable GTID and binlog is DML, go-mysql will only update GTID set in a XID event after the rows event, for example, the binlog events are: -// - Query event, location is gset1 -// - Rows event, location is gset1 +// IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint. +// For GTID replication, go-mysql will only update GTID set in a XID event after the rows event, for example, the binlog events are: +// - Query event e1, location is gset1 +// - Rows event e2, location is gset1 // - XID event, location is gset2 -// after syncer handle query event, will save table point with gset1, and when handle rows event, will compare the rows's location with table checkpoint's location in `IsNewerTablePoint`, and these two location have same gset, so we should use `>=` to compare location in this case. -// - when enable GTID and binlog is DDL, different DDL have different GTID set, so if GTID set is euqal, it is a old table point, should use `>` to compare location in this case. -// - when not enable GTID, just compare the position, and only when grater than the old point is newer table point, should use `>` to compare location is this case. -func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, location binlog.Location, gte bool) bool { +// We should note that e1 is not older than e2 +// For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position. +func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, location binlog.Location) bool { cp.RLock() defer cp.RUnlock() mSchema, ok := cp.points[sourceSchema] if !ok { - return true + return false } point, ok := mSchema[sourceTable] if !ok { - return true + return false } oldLocation := point.MySQLLocation() cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation)) - if gte { - return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) >= 0 - } - - return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) > 0 + return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) < 0 } // SaveGlobalPoint implements CheckPoint.SaveGlobalPoint. diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 804f60cf9c..be11fc0595 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -340,23 +340,23 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { ) // not exist - newer := cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsTrue) + older := cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + c.Assert(older, IsFalse) // save cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + c.Assert(older, IsTrue) // rollback, to min cp.Rollback(s.tracker) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsTrue) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + c.Assert(older, IsFalse) // save again cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + c.Assert(older, IsTrue) // flush + rollback s.mock.ExpectBegin() @@ -365,13 +365,13 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) cp.Rollback(s.tracker) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + c.Assert(older, IsTrue) // save cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + c.Assert(older, IsTrue) // delete s.mock.ExpectBegin() @@ -431,8 +431,8 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectCommit() err = cp.Clear(tctx) c.Assert(err, IsNil) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsTrue) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + c.Assert(older, IsFalse) // test save table point less than global point func() { @@ -452,8 +452,8 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { err = cp.FlushPointsExcept(tctx, [][]string{{schema, table}}, nil, nil) c.Assert(err, IsNil) cp.Rollback(s.tracker) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsTrue) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + c.Assert(older, IsFalse) s.mock.ExpectBegin() s.mock.ExpectExec(clearCheckPointSQL).WithArgs(cpid).WillReturnResult(sqlmock.NewResult(0, 1)) diff --git a/syncer/syncer.go b/syncer/syncer.go index d3a74c662f..d7dddf1817 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1663,7 +1663,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } // DML position before table checkpoint, ignore it - if !s.checkpoint.IsNewerTablePoint(originSchema, originTable, *ec.currentLocation, s.cfg.EnableGTID) { + if s.checkpoint.IsOlderThanTablePoint(originSchema, originTable, *ec.currentLocation) { ec.tctx.L().Debug("ignore obsolete event that is old than table checkpoint", zap.String("event", "row"), log.WrapStringerField("location", ec.currentLocation), zap.String("origin schema", originSchema), zap.String("origin table", originTable)) return nil } @@ -1904,7 +1904,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o // for DDL, we wait it to be executed, so we can check if event is newer in this syncer's main process goroutine // ignore obsolete DDL here can avoid to try-sync again for already synced DDLs - if !s.checkpoint.IsNewerTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentLocation, false) { + if s.checkpoint.IsOlderThanTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentLocation) { ec.tctx.L().Info("ignore obsolete DDL", zap.String("event", "query"), zap.String("statement", sql), log.WrapStringerField("location", ec.currentLocation)) continue } diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 83da653d84..f6039e2338 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -214,13 +214,13 @@ function test_fail_job_between_event() { run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "\"result\": true" 3 - read -p 666 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml } function run() { - # run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" - # run_sql_source1 "SET @@global.time_zone = '+01:00';" - # run_sql_source2 "SET @@global.time_zone = '+02:00';" + run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" + run_sql_source1 "SET @@global.time_zone = '+01:00';" + run_sql_source2 "SET @@global.time_zone = '+02:00';" test_fail_job_between_event test_session_config From 287971e49e17441bb62069011bb16204750a0ad7 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 9 Jun 2021 18:37:08 +0800 Subject: [PATCH 4/7] fix comment of interface --- syncer/checkpoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index f529393dc5..34219fffe2 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -198,7 +198,7 @@ type CheckPoint interface { // DeleteSchemaPoint deletes checkpoint for specified schema DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error - // IsOlderThanTablePoint checks whether job's checkpoint is newer than previous saved checkpoint + // IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint IsOlderThanTablePoint(sourceSchema, sourceTable string, point binlog.Location) bool // SaveGlobalPoint saves the global binlog stream's checkpoint From c466f68fa250fdf2e36d703e8833adfa0c4bf8e5 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 9 Jun 2021 19:00:54 +0800 Subject: [PATCH 5/7] fix one case, another one is failed --- tests/all_mode/run.sh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 1139801d82..aa463e2d88 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -216,6 +216,11 @@ function test_fail_job_between_event() { "query-status test" \ "\"result\": true" 3 check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + cleanup_data all_mode + cleanup_process $* + + export GO_FAILPOINTS='' } function run() { From ffbd8011f92cde76aeda2277498047e2a12e92b2 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 9 Jun 2021 19:32:17 +0800 Subject: [PATCH 6/7] fix test --- syncer/checkpoint.go | 8 ++++++-- syncer/checkpoint_test.go | 16 ++++++++-------- syncer/syncer.go | 12 ++++++------ 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 34219fffe2..33529cc22a 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -199,7 +199,7 @@ type CheckPoint interface { DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error // IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint - IsOlderThanTablePoint(sourceSchema, sourceTable string, point binlog.Location) bool + IsOlderThanTablePoint(sourceSchema, sourceTable string, point binlog.Location, useLE bool) bool // SaveGlobalPoint saves the global binlog stream's checkpoint // corresponding to Meta.Save @@ -450,7 +450,8 @@ func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSche // - XID event, location is gset2 // We should note that e1 is not older than e2 // For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position. -func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, location binlog.Location) bool { +// if useLE is true, we use less than or equal. +func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, location binlog.Location, useLE bool) bool { cp.RLock() defer cp.RUnlock() mSchema, ok := cp.points[sourceSchema] @@ -464,6 +465,9 @@ func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable stri oldLocation := point.MySQLLocation() cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation)) + if useLE { + return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) <= 0 + } return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) < 0 } diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index be11fc0595..bf423371f1 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -340,22 +340,22 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { ) // not exist - older := cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + older := cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) c.Assert(older, IsFalse) // save cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil) - older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) c.Assert(older, IsTrue) // rollback, to min cp.Rollback(s.tracker) - older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) c.Assert(older, IsFalse) // save again cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil) - older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) c.Assert(older, IsTrue) // flush + rollback @@ -365,12 +365,12 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) cp.Rollback(s.tracker) - older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) c.Assert(older, IsTrue) // save cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil) - older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) c.Assert(older, IsTrue) // delete @@ -431,7 +431,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectCommit() err = cp.Clear(tctx) c.Assert(err, IsNil) - older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) c.Assert(older, IsFalse) // test save table point less than global point @@ -452,7 +452,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { err = cp.FlushPointsExcept(tctx, [][]string{{schema, table}}, nil, nil) c.Assert(err, IsNil) cp.Rollback(s.tracker) - older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) c.Assert(older, IsFalse) s.mock.ExpectBegin() diff --git a/syncer/syncer.go b/syncer/syncer.go index 2cfb30f7e7..567a5dbb71 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1177,13 +1177,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { var ( flushCheckpoint bool - cleanDumpFile = s.cfg.CleanDumpFile + cleanDumpFile = s.cfg.CleanDumpFile && fresh ) flushCheckpoint, err = s.adjustGlobalPointGTID(tctx) if err != nil { return err } - if s.cfg.Mode == config.ModeAll { + if s.cfg.Mode == config.ModeAll && fresh { flushCheckpoint = true err = s.loadTableStructureFromDump(ctx) if err != nil { @@ -1676,7 +1676,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } // DML position before table checkpoint, ignore it - if s.checkpoint.IsOlderThanTablePoint(originSchema, originTable, *ec.currentLocation) { + if s.checkpoint.IsOlderThanTablePoint(originSchema, originTable, *ec.currentLocation, false) { ec.tctx.L().Debug("ignore obsolete event that is old than table checkpoint", zap.String("event", "row"), log.WrapStringerField("location", ec.currentLocation), zap.String("origin schema", originSchema), zap.String("origin table", originTable)) return nil } @@ -1915,9 +1915,9 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o continue } - // for DDL, we wait it to be executed, so we can check if event is newer in this syncer's main process goroutine - // ignore obsolete DDL here can avoid to try-sync again for already synced DDLs - if s.checkpoint.IsOlderThanTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentLocation) { + // DDL is sequentially synchronized in this syncer's main process goroutine + // ignore DDL that is older or same as table checkpoint, to try-sync again for already synced DDLs + if s.checkpoint.IsOlderThanTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentLocation, true) { ec.tctx.L().Info("ignore obsolete DDL", zap.String("event", "query"), zap.String("statement", sql), log.WrapStringerField("location", ec.currentLocation)) continue } From 04cbdf30b83160d816f135d7dc65fd29a1c53f86 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 12 Jun 2021 01:04:03 +0800 Subject: [PATCH 7/7] forget to rename the sql file --- tests/all_mode/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 93eae55017..fa612b429e 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -305,8 +305,8 @@ function test_fail_job_between_event() { dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" - run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment3.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 check_log_contain_with_retry "failSecondJobOfEvent" $WORK_DIR/worker1/log/dm-worker.log check_log_contain_with_retry "failSecondJobOfEvent" $WORK_DIR/worker2/log/dm-worker.log run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \