From 85f28cdbc6434593cb224e0e6085703766b13512 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Fri, 11 Nov 2022 22:43:04 +0800 Subject: [PATCH 1/7] ddl: don't update duplicate schema version when execute multi-schema change --- ddl/multi_schema_change.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index f3e02d0a74584..13db544ea9f8a 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -106,7 +106,6 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve if err != nil { return ver, err } - var schemaVersionGenerated = false subJobs := make([]model.SubJob, len(job.MultiSchemaInfo.SubJobs)) // Step the sub-jobs to the non-revertible states all at once. // We only generate 1 schema version for these sub-job. @@ -116,10 +115,8 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve } subJobs[i] = *sub proxyJob := sub.ToProxyJob(job) - if schemaVersionGenerated { + if i != len(job.MultiSchemaInfo.SubJobs)-1 { proxyJob.MultiSchemaInfo.SkipVersion = true - } else { - schemaVersionGenerated = true } ver, err = w.runDDLJob(d, t, &proxyJob) sub.FromProxyJob(&proxyJob, ver) @@ -376,8 +373,8 @@ func finishMultiSchemaJob(job *model.Job, t *meta.Meta) (ver int64, err error) { } tblInfo, err := t.GetTable(job.SchemaID, job.TableID) if err != nil { - return ver, err + return 0, err } job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) - return ver, err + return 0, err } From 0ef4b97cd4d3b34c892443eace23311afd1c76a4 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Sat, 12 Nov 2022 08:18:35 +0800 Subject: [PATCH 2/7] fix ut --- ddl/multi_schema_change.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 13db544ea9f8a..7c05da0f62409 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -106,6 +106,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve if err != nil { return ver, err } + var schemaVersionGenerated = false subJobs := make([]model.SubJob, len(job.MultiSchemaInfo.SubJobs)) // Step the sub-jobs to the non-revertible states all at once. // We only generate 1 schema version for these sub-job. @@ -115,11 +116,16 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve } subJobs[i] = *sub proxyJob := sub.ToProxyJob(job) - if i != len(job.MultiSchemaInfo.SubJobs)-1 { + if schemaVersionGenerated { proxyJob.MultiSchemaInfo.SkipVersion = true + } else { + schemaVersionGenerated = true } - ver, err = w.runDDLJob(d, t, &proxyJob) - sub.FromProxyJob(&proxyJob, ver) + proxyJobVer, err := w.runDDLJob(d, t, &proxyJob) + if proxyJobVer != 0 { + ver = proxyJobVer + } + sub.FromProxyJob(&proxyJob, proxyJobVer) if err != nil || proxyJob.Error != nil { for j := i - 1; j >= 0; j-- { job.MultiSchemaInfo.SubJobs[j] = &subJobs[j] From acef1c4d14cd3e036dee204a1833ee76ca4cc040 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Sat, 12 Nov 2022 08:38:09 +0800 Subject: [PATCH 3/7] make code more readable --- ddl/multi_schema_change.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 7c05da0f62409..9fb9230e0fbbf 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -122,7 +122,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve schemaVersionGenerated = true } proxyJobVer, err := w.runDDLJob(d, t, &proxyJob) - if proxyJobVer != 0 { + if !proxyJob.MultiSchemaInfo.SkipVersion { ver = proxyJobVer } sub.FromProxyJob(&proxyJob, proxyJobVer) From 2ccd2d47485dad20b32a0eca9774d89d06142cdc Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Sat, 12 Nov 2022 09:25:47 +0800 Subject: [PATCH 4/7] add tests --- ddl/callback.go | 8 +++---- ddl/callback_test.go | 22 ++++++++++++------- ddl/ddl_worker.go | 2 +- ddl/job_table.go | 2 +- ddl/multi_schema_change_test.go | 38 +++++++++++++++++++++++++++------ 5 files changed, 52 insertions(+), 20 deletions(-) diff --git a/ddl/callback.go b/ddl/callback.go index 84b22cdfed944..b6160d150e717 100644 --- a/ddl/callback.go +++ b/ddl/callback.go @@ -48,7 +48,7 @@ type Callback interface { // OnChanged is called after a ddl statement is finished. OnChanged(err error) error // OnSchemaStateChanged is called after a schema state is changed. - OnSchemaStateChanged() + OnSchemaStateChanged(schemaVer int64) // OnJobRunBefore is called before running job. OnJobRunBefore(job *model.Job) // OnJobUpdated is called after the running job is updated. @@ -71,7 +71,7 @@ func (*BaseCallback) OnChanged(err error) error { } // OnSchemaStateChanged implements Callback interface. -func (*BaseCallback) OnSchemaStateChanged() { +func (*BaseCallback) OnSchemaStateChanged(schemaVer int64) { // Nothing to do. } @@ -129,7 +129,7 @@ func (c *DefaultCallback) OnChanged(err error) error { } // OnSchemaStateChanged overrides the ddl Callback interface. -func (c *DefaultCallback) OnSchemaStateChanged() { +func (c *DefaultCallback) OnSchemaStateChanged(schemaVer int64) { err := c.do.Reload() if err != nil { logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err)) @@ -166,7 +166,7 @@ func (c *ctcCallback) OnChanged(err error) error { } // OnSchemaStateChanged overrides the ddl Callback interface. -func (c *ctcCallback) OnSchemaStateChanged() { +func (c *ctcCallback) OnSchemaStateChanged(retVer int64) { err := c.do.Reload() if err != nil { logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err)) diff --git a/ddl/callback_test.go b/ddl/callback_test.go index 4e7199dbef8ca..5a97e8212689e 100644 --- a/ddl/callback_test.go +++ b/ddl/callback_test.go @@ -48,13 +48,14 @@ type TestDDLCallback struct { // domain to reload schema before your ddl stepping into the next state change. Do DomainReloader - onJobRunBefore func(*model.Job) - OnJobRunBeforeExported func(*model.Job) - onJobUpdated func(*model.Job) - OnJobUpdatedExported atomic.Pointer[func(*model.Job)] - onWatched func(ctx context.Context) - OnGetJobBeforeExported func(string) - OnGetJobAfterExported func(string, *model.Job) + onJobRunBefore func(*model.Job) + OnJobRunBeforeExported func(*model.Job) + onJobUpdated func(*model.Job) + OnJobUpdatedExported atomic.Pointer[func(*model.Job)] + onWatched func(ctx context.Context) + OnGetJobBeforeExported func(string) + OnGetJobAfterExported func(string, *model.Job) + OnJobSchemaStateChanged func(int64) } // OnChanged mock the same behavior with the main DDL hook. @@ -73,12 +74,17 @@ func (tc *TestDDLCallback) OnChanged(err error) error { } // OnSchemaStateChanged mock the same behavior with the main ddl hook. -func (tc *TestDDLCallback) OnSchemaStateChanged() { +func (tc *TestDDLCallback) OnSchemaStateChanged(schemaVer int64) { if tc.Do != nil { if err := tc.Do.Reload(); err != nil { logutil.BgLogger().Warn("reload failed on schema state changed", zap.Error(err)) } } + + if tc.OnJobSchemaStateChanged != nil { + tc.OnJobSchemaStateChanged(schemaVer) + return + } } // OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first. diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index ee7e7f3c6594e..956f4c805347f 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1028,7 +1028,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { if RunInGoTest { // d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously. d.mu.RLock() - d.mu.hook.OnSchemaStateChanged() + d.mu.hook.OnSchemaStateChanged(schemaVer) d.mu.RUnlock() } diff --git a/ddl/job_table.go b/ddl/job_table.go index 83585ca040704..d23f083539e87 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -293,7 +293,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { if RunInGoTest { // d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously. d.mu.RLock() - d.mu.hook.OnSchemaStateChanged() + d.mu.hook.OnSchemaStateChanged(schemaVer) d.mu.RUnlock() } diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index d2e62d20d736d..f901a6e2b30d1 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -524,14 +524,14 @@ func TestMultiSchemaChangeAddIndexes(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, c int)") tk.MustGetErrCode("alter table t add index t(a), add index t(b)", errno.ErrUnsupportedDDLOperation) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) // Test add indexes with drop column. tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, c int)") tk.MustGetErrCode("alter table t add index t(a), drop column a", errno.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t add index t(a, b), drop column a", errno.ErrUnsupportedDDLOperation) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) // Test add index failed. tk.MustExec("drop table if exists t;") @@ -539,7 +539,7 @@ func TestMultiSchemaChangeAddIndexes(t *testing.T) { tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 1);") tk.MustGetErrCode("alter table t add unique index i1(a), add unique index i2(a, b), add unique index i3(c);", errno.ErrDupEntry) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) tk.MustExec("alter table t add index i1(a), add index i2(a, b), add index i3(c);") } @@ -564,7 +564,7 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) { "add index t2(a), add index t3(a, b);", errno.ErrCancelledDDLJob) dom.DDL().SetHook(originHook) cancelHook.MustCancelDone(t) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3")) tk.MustExec("admin check table t;") @@ -1071,9 +1071,9 @@ func TestMultiSchemaChangeAlterIndexVisibility(t *testing.T) { tk.MustExec("use test;") tk.MustExec("create table t (a int, b int, index idx(b));") tk.MustExec("alter table t add index idx2(a), alter index idx visible;") - tk.MustQuery("select * from t use index (idx, idx2);").Check(testkit.Rows( /* no rows */ )) + tk.MustQuery("select * from t use index (idx, idx2);").Check(testkit.Rows( /* no rows */)) tk.MustGetErrCode("alter table t drop column b, alter index idx invisible;", errno.ErrKeyDoesNotExist) - tk.MustQuery("select a, b from t;").Check(testkit.Rows( /* no rows */ )) + tk.MustQuery("select a, b from t;").Check(testkit.Rows( /* no rows */)) } func TestMultiSchemaChangeWithExpressionIndex(t *testing.T) { @@ -1141,6 +1141,32 @@ func TestMultiSchemaChangeUnsupportedType(t *testing.T) { "[ddl:8200]Unsupported multi schema change for modify auto id cache") } +func TestMultiSchemaChangeSchemaVersion(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("create table t(a int, b int, c int, d int)") + tk.MustExec("insert into t values (1,2,3,4)") + + schemaVerMap := map[int64]struct{}{} + + originHook := dom.DDL().GetHook() + hook := &ddl.TestDDLCallback{Do: dom} + hook.OnJobSchemaStateChanged = func(schemaVer int64) { + if schemaVer != 0 { + // No same return schemaVer during multi-schema change + _, ok := schemaVerMap[schemaVer] + assert.False(t, ok) + schemaVerMap[schemaVer] = struct{}{} + } + } + dom.DDL().SetHook(hook) + tk.MustExec("alter table t drop column b, drop column c") + tk.MustExec("alter table t add column b int, add column c int") + tk.MustExec("alter table t add index k(b), add column e int") + dom.DDL().SetHook(originHook) +} + func TestMultiSchemaChangeMixedWithUpdate(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) From 79614e0589b69911c376906efc2de80f043bd956 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Sat, 12 Nov 2022 09:28:18 +0800 Subject: [PATCH 5/7] fix fmt --- ddl/multi_schema_change_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index f901a6e2b30d1..f15fa26272398 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -524,14 +524,14 @@ func TestMultiSchemaChangeAddIndexes(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, c int)") tk.MustGetErrCode("alter table t add index t(a), add index t(b)", errno.ErrUnsupportedDDLOperation) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) // Test add indexes with drop column. tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, c int)") tk.MustGetErrCode("alter table t add index t(a), drop column a", errno.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t add index t(a, b), drop column a", errno.ErrUnsupportedDDLOperation) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) // Test add index failed. tk.MustExec("drop table if exists t;") @@ -539,7 +539,7 @@ func TestMultiSchemaChangeAddIndexes(t *testing.T) { tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 1);") tk.MustGetErrCode("alter table t add unique index i1(a), add unique index i2(a, b), add unique index i3(c);", errno.ErrDupEntry) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) tk.MustExec("alter table t add index i1(a), add index i2(a, b), add index i3(c);") } @@ -564,7 +564,7 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) { "add index t2(a), add index t3(a, b);", errno.ErrCancelledDDLJob) dom.DDL().SetHook(originHook) cancelHook.MustCancelDone(t) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3")) tk.MustExec("admin check table t;") @@ -1071,9 +1071,9 @@ func TestMultiSchemaChangeAlterIndexVisibility(t *testing.T) { tk.MustExec("use test;") tk.MustExec("create table t (a int, b int, index idx(b));") tk.MustExec("alter table t add index idx2(a), alter index idx visible;") - tk.MustQuery("select * from t use index (idx, idx2);").Check(testkit.Rows( /* no rows */)) + tk.MustQuery("select * from t use index (idx, idx2);").Check(testkit.Rows( /* no rows */ )) tk.MustGetErrCode("alter table t drop column b, alter index idx invisible;", errno.ErrKeyDoesNotExist) - tk.MustQuery("select a, b from t;").Check(testkit.Rows( /* no rows */)) + tk.MustQuery("select a, b from t;").Check(testkit.Rows( /* no rows */ )) } func TestMultiSchemaChangeWithExpressionIndex(t *testing.T) { From 7e04783577c1a90ba7b12faefe2bc7099402a47e Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 14 Nov 2022 10:14:30 +0800 Subject: [PATCH 6/7] add test --- ddl/multi_schema_change_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index f15fa26272398..250fb692f3500 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -1164,6 +1164,7 @@ func TestMultiSchemaChangeSchemaVersion(t *testing.T) { tk.MustExec("alter table t drop column b, drop column c") tk.MustExec("alter table t add column b int, add column c int") tk.MustExec("alter table t add index k(b), add column e int") + tk.MustExec("alter table t alter index k invisible, drop column e") dom.DDL().SetHook(originHook) } From a5ac738656c0541bce6e051124953aa0a2f7ce6c Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 14 Nov 2022 11:05:17 +0800 Subject: [PATCH 7/7] update --- ddl/multi_schema_change.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 9fb9230e0fbbf..ab306fe546932 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -118,11 +118,10 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve proxyJob := sub.ToProxyJob(job) if schemaVersionGenerated { proxyJob.MultiSchemaInfo.SkipVersion = true - } else { - schemaVersionGenerated = true } proxyJobVer, err := w.runDDLJob(d, t, &proxyJob) - if !proxyJob.MultiSchemaInfo.SkipVersion { + if !schemaVersionGenerated && proxyJobVer != 0 { + schemaVersionGenerated = true ver = proxyJobVer } sub.FromProxyJob(&proxyJob, proxyJobVer)