Skip to content
This repository was archived by the owner on Nov 24, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ type CheckPoint interface {
// DeleteSchemaPoint deletes checkpoint for specified schema
DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error

// IsNewerTablePoint checks whether job's checkpoint is newer than previous saved checkpoint
IsNewerTablePoint(sourceSchema, sourceTable string, point binlog.Location, gte bool) bool
// IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint
IsOlderThanTablePoint(sourceSchema, sourceTable string, point binlog.Location, useLE bool) bool

// SaveGlobalPoint saves the global binlog stream's checkpoint
// corresponding to Meta.Save
Expand Down Expand Up @@ -443,34 +443,32 @@ func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSche
return nil
}

// IsNewerTablePoint implements CheckPoint.IsNewerTablePoint.
// gte means greater than or equal, gte should judge by EnableGTID and the event type
// - when enable GTID and binlog is DML, go-mysql will only update GTID set in a XID event after the rows event, for example, the binlog events are:
// - Query event, location is gset1
// - Rows event, location is gset1
// IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint.
// For GTID replication, go-mysql will only update GTID set in a XID event after the rows event, for example, the binlog events are:
// - Query event e1, location is gset1
// - Rows event e2, location is gset1
// - XID event, location is gset2
// after syncer handle query event, will save table point with gset1, and when handle rows event, will compare the rows's location with table checkpoint's location in `IsNewerTablePoint`, and these two location have same gset, so we should use `>=` to compare location in this case.
// - when enable GTID and binlog is DDL, different DDL have different GTID set, so if GTID set is euqal, it is a old table point, should use `>` to compare location in this case.
// - when not enable GTID, just compare the position, and only when grater than the old point is newer table point, should use `>` to compare location is this case.
func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, location binlog.Location, gte bool) bool {
// We should note that e1 is not older than e2
// For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position.
// if useLE is true, we use less than or equal.
func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, location binlog.Location, useLE bool) bool {
cp.RLock()
defer cp.RUnlock()
mSchema, ok := cp.points[sourceSchema]
if !ok {
return true
return false
}
point, ok := mSchema[sourceTable]
if !ok {
return true
return false
}
oldLocation := point.MySQLLocation()
cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation))

if gte {
return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) >= 0
if useLE {
return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) <= 0
}

return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) > 0
return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) < 0
}

// SaveGlobalPoint implements CheckPoint.SaveGlobalPoint.
Expand Down
32 changes: 16 additions & 16 deletions syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,23 +340,23 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
)

// not exist
newer := cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsTrue)
older := cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsFalse)

// save
cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsTrue)

// rollback, to min
cp.Rollback(s.tracker)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsFalse)

// save again
cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsTrue)

// flush + rollback
s.mock.ExpectBegin()
Expand All @@ -365,13 +365,13 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
c.Assert(err, IsNil)
cp.Rollback(s.tracker)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsTrue)

// save
cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsTrue)

// delete
s.mock.ExpectBegin()
Expand Down Expand Up @@ -431,8 +431,8 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
s.mock.ExpectCommit()
err = cp.Clear(tctx)
c.Assert(err, IsNil)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsFalse)

// test save table point less than global point
func() {
Expand All @@ -452,8 +452,8 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
err = cp.FlushPointsExcept(tctx, [][]string{{schema, table}}, nil, nil)
c.Assert(err, IsNil)
cp.Rollback(s.tracker)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsFalse)

s.mock.ExpectBegin()
s.mock.ExpectExec(clearCheckPointSQL).WithArgs(cpid).WillReturnResult(sqlmock.NewResult(0, 1))
Expand Down
53 changes: 47 additions & 6 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,34 @@ func (s *Syncer) saveTablePoint(db, table string, location binlog.Location) {
s.checkpoint.SaveTablePoint(db, table, location, ti)
}

// only used in tests.
var (
lastPos mysql.Position
lastPosNum int
waitJobsDone bool
failExecuteSQL bool
failOnce atomic.Bool
)

func (s *Syncer) addJob(job *job) error {
failpoint.Inject("countJobFromOneEvent", func() {
if job.currentLocation.Position.Compare(lastPos) == 0 {
lastPosNum++
} else {
lastPos = job.currentLocation.Position
lastPosNum = 1
}
// trigger a flush after see one job
if lastPosNum == 1 {
waitJobsDone = true
s.tctx.L().Info("meet the first job of an event", zap.Any("binlog position", lastPos))
}
// mock a execution error after see two jobs.
if lastPosNum == 2 {
failExecuteSQL = true
s.tctx.L().Info("meet the second job of an event", zap.Any("binlog position", lastPos))
}
})
var queueBucket int
switch job.tp {
case xid:
Expand Down Expand Up @@ -838,6 +865,13 @@ func (s *Syncer) addJob(job *job) error {

// nolint:ifshort
wait := s.checkWait(job)
failpoint.Inject("flushFirstJobOfEvent", func() {
if waitJobsDone {
s.tctx.L().Info("trigger flushFirstJobOfEvent")
waitJobsDone = false
wait = true
}
})
if wait {
s.jobWg.Wait()
s.c.reset()
Expand Down Expand Up @@ -1122,6 +1156,13 @@ func (s *Syncer) syncDML(tctx *tcontext.Context, queueBucket string, db *DBConn,
return 0, nil
}

failpoint.Inject("failSecondJobOfEvent", func() {
if failExecuteSQL && failOnce.CAS(false, true) {
s.tctx.L().Info("trigger failSecondJobOfEvent")
failpoint.Return(0, errors.New("failSecondJobOfEvent"))
}
})

select {
case <-tctx.Ctx.Done():
// do not execute queries anymore, because they should be failed with a done context.
Expand Down Expand Up @@ -1210,13 +1251,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

var (
flushCheckpoint bool
cleanDumpFile = s.cfg.CleanDumpFile
cleanDumpFile = s.cfg.CleanDumpFile && fresh
)
flushCheckpoint, err = s.adjustGlobalPointGTID(tctx)
if err != nil {
return err
}
if s.cfg.Mode == config.ModeAll {
if s.cfg.Mode == config.ModeAll && fresh {
flushCheckpoint = true
err = s.loadTableStructureFromDump(ctx)
if err != nil {
Expand Down Expand Up @@ -1742,7 +1783,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
}

// DML position before table checkpoint, ignore it
if !s.checkpoint.IsNewerTablePoint(originSchema, originTable, *ec.currentLocation, s.cfg.EnableGTID) {
if s.checkpoint.IsOlderThanTablePoint(originSchema, originTable, *ec.currentLocation, false) {
ec.tctx.L().Debug("ignore obsolete event that is old than table checkpoint", zap.String("event", "row"), log.WrapStringerField("location", ec.currentLocation), zap.String("origin schema", originSchema), zap.String("origin table", originTable))
return nil
}
Expand Down Expand Up @@ -1983,9 +2024,9 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o
continue
}

// for DDL, we wait it to be executed, so we can check if event is newer in this syncer's main process goroutine
// ignore obsolete DDL here can avoid to try-sync again for already synced DDLs
if !s.checkpoint.IsNewerTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentLocation, false) {
// DDL is sequentially synchronized in this syncer's main process goroutine
// ignore DDL that is older or same as table checkpoint, to try-sync again for already synced DDLs
if s.checkpoint.IsOlderThanTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentLocation, true) {
ec.tctx.L().Info("ignore obsolete DDL", zap.String("event", "query"), zap.String("statement", sql), log.WrapStringerField("location", ec.currentLocation))
continue
}
Expand Down
3 changes: 3 additions & 0 deletions tests/all_mode/data/db1.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use all_mode;
insert into t1 (id, name) values (10, '10'), (20, '20');
insert into t1 (id, name) values (30, '30');
3 changes: 3 additions & 0 deletions tests/all_mode/data/db2.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use all_mode;
insert into t2 (id, name) values (10, '10'), (20, '20');
insert into t2 (id, name) values (30, '30');
51 changes: 50 additions & 1 deletion tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,60 @@ function test_stop_task_before_checkpoint() {
export GO_FAILPOINTS=''
}

function test_fail_job_between_event() {
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

# start DM worker and master
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
check_metric $MASTER_PORT 'start_leader_counter' 3 0 2

inject_points=(
"github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"
"github.com/pingcap/dm/syncer/countJobFromOneEvent=return()"
"github.com/pingcap/dm/syncer/flushFirstJobOfEvent=return()"
"github.com/pingcap/dm/syncer/failSecondJobOfEvent=return()"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "s/enable-gtid: true/enable-gtid: false/g" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta"

run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment3.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_log_contain_with_retry "failSecondJobOfEvent" $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry "failSecondJobOfEvent" $WORK_DIR/worker2/log/dm-worker.log
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"result\": true" 3
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

cleanup_data all_mode
cleanup_process $*

export GO_FAILPOINTS=''
}

function run() {
run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'"
run_sql_source1 "SET @@global.time_zone = '+01:00';"
run_sql_source2 "SET @@global.time_zone = '+02:00';"

test_fail_job_between_event
test_syncer_metrics
test_session_config
test_query_timeout
Expand Down