From b0960446c206c09b010200213c91826b11aae28a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 23 Oct 2020 18:20:27 +0800 Subject: [PATCH 1/8] election: make sure old `elec` will resign --- pkg/election/election.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/election/election.go b/pkg/election/election.go index ff530aca98..47eb527447 100644 --- a/pkg/election/election.go +++ b/pkg/election/election.go @@ -359,6 +359,9 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session e.campaignMu.Unlock() defer func() { + if err := elec.Resign(ctx); err != nil { + e.l.Warn("fail to resign leader", zap.Stringer("current member", e.info), zap.Error(err)) + } e.campaignMu.Lock() e.resignCh = nil e.campaignMu.Unlock() @@ -368,9 +371,6 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session for { if e.evictLeader.Get() == 1 { - if err := elec.Resign(ctx); err != nil { - e.l.Info("fail to resign leader", zap.Stringer("current member", e.info), zap.Error(err)) - } return } @@ -397,9 +397,6 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session case <-ctx.Done(): return case <-e.resignCh: - if err := elec.Resign(ctx); err != nil { - e.l.Info("fail to resign leader", zap.Stringer("current member", e.info), zap.Error(err)) - } return } } From 7adfa951502519c2a890880a966e7b448f31125f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 24 Oct 2020 20:23:45 +0800 Subject: [PATCH 2/8] add test --- pkg/election/election.go | 7 +++- pkg/election/election_test.go | 77 +++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/pkg/election/election.go b/pkg/election/election.go index 47eb527447..a4165fddb9 100644 --- a/pkg/election/election.go +++ b/pkg/election/election.go @@ -351,6 +351,7 @@ func (e *Election) notifyLeader(ctx context.Context, leaderInfo *CampaignerInfo) } } +// watchLeader should call `elec.Resign` when exit, to remove election-key func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election) { e.l.Debug("watch leader key", zap.String("key", key)) @@ -359,8 +360,12 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session e.campaignMu.Unlock() defer func() { - if err := elec.Resign(ctx); err != nil { + e.l.Info("will try resign leader") + timeoutCtx, _ := context.WithTimeout(context.Background(), time.Duration(e.sessionTTL)*time.Second) + if err := elec.Resign(timeoutCtx); err != nil { e.l.Warn("fail to resign leader", zap.Stringer("current member", e.info), zap.Error(err)) + } else { + e.l.Info("finish resign leader") } e.campaignMu.Lock() e.resignCh = nil diff --git a/pkg/election/election_test.go b/pkg/election/election_test.go index 29061aaf8c..a0db4b2b98 100644 --- a/pkg/election/election_test.go +++ b/pkg/election/election_test.go @@ -25,7 +25,9 @@ import ( "github.com/pingcap/failpoint" "github.com/tikv/pd/pkg/tempurl" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/concurrency" "go.etcd.io/etcd/embed" + "go.uber.org/zap" "github.com/pingcap/dm/pkg/etcdutil" "github.com/pingcap/dm/pkg/log" @@ -402,3 +404,78 @@ func (t *testElectionSuite) TestElectionDeleteKey(c *C) { c.Assert(err, IsNil) wg.Wait() } + +func (t *testElectionSuite) TestCancelCtxWontBlock(c *C) { + var ( + wg sync.WaitGroup + timeout = 3 * time.Second + sessionTTL = 60 + key = "unit-test/election-key" + ID = "member" + addr = "127.0.0.1:1234" + ) + cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil) + c.Assert(err, IsNil) + defer cli.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // manually create Election, don't use `NewElection` to avoid `campaignLoop` + e := &Election{ + cli: cli, + sessionTTL: sessionTTL, + key: key, + info: &CampaignerInfo{ + ID: ID, + Addr: addr, + }, + notifyBlockTime: t.notifyBlockTime, + leaderCh: make(chan *CampaignerInfo, 1), + ech: make(chan error, 1), // size 1 is enough + cancel: func() {}, // no use, but avoid panic + l: log.With(zap.String("component", "election")), + } + e.infoStr = e.info.String() + defer e.Close() + + session, err := e.newSession(ctx, newSessionDefaultRetryCnt) + c.Assert(err, IsNil) + elec := concurrency.NewElection(session, e.key) + ctx2, cancel2 := context.WithCancel(ctx) + + resp, err := cli.Get(ctx, key, clientv3.WithPrefix()) + c.Assert(err, IsNil) + c.Assert(resp.Kvs, HasLen, 0) + + go func() { + c.Assert(elec.Campaign(ctx2, e.key), IsNil) + }() + ch := elec.Observe(ctx2) + info := <-ch + leaderKey := string(info.Kvs[0].Key) + + wg.Add(1) + go func() { + e.watchLeader(ctx2, session, leaderKey, elec) + wg.Done() + }() + + time.Sleep(100 * time.Millisecond) + cancel2() + + // watchLeader will clean election-key + wg.Wait() + resp, err = cli.Get(ctx, key, clientv3.WithPrefix()) + c.Assert(err, IsNil) + c.Assert(resp.Kvs, HasLen, 0) + + elec2 := concurrency.NewElection(session, e.key) + ch2 := elec2.Observe(ctx) + select { + case info = <-ch2: + // should not observe old election-key + c.Log(info) + c.Fail() + case <-time.After(timeout): + } +} From b351255680a7484cb919946dc3ae08ca9d573b8b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 24 Oct 2020 20:48:25 +0800 Subject: [PATCH 3/8] fix CI --- pkg/election/election.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/election/election.go b/pkg/election/election.go index a4165fddb9..3d7bee0072 100644 --- a/pkg/election/election.go +++ b/pkg/election/election.go @@ -361,12 +361,13 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session defer func() { e.l.Info("will try resign leader") - timeoutCtx, _ := context.WithTimeout(context.Background(), time.Duration(e.sessionTTL)*time.Second) + timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Duration(e.sessionTTL)*time.Second) if err := elec.Resign(timeoutCtx); err != nil { e.l.Warn("fail to resign leader", zap.Stringer("current member", e.info), zap.Error(err)) } else { e.l.Info("finish resign leader") } + cancel() e.campaignMu.Lock() e.resignCh = nil e.campaignMu.Unlock() From 61e261bd4d0337b4ae01d9c8790574ce84122540 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sun, 25 Oct 2020 20:45:26 +0800 Subject: [PATCH 4/8] fix test --- pkg/election/election.go | 7 ++++--- pkg/election/election_test.go | 5 ++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/election/election.go b/pkg/election/election.go index 3d7bee0072..3da97ec882 100644 --- a/pkg/election/election.go +++ b/pkg/election/election.go @@ -320,15 +320,16 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio continue } + // make sure `elec.Campaign` was exited (instead of rely on `<-eleObserveCh`), to avoid data race writing `elec`'s member + cancel2() + campaignWg.Wait() + e.l.Info("become leader", zap.Stringer("current member", e.info)) e.notifyLeader(ctx, leaderInfo) // become the leader now e.watchLeader(ctx, session, leaderKey, elec) e.l.Info("retire from leader", zap.Stringer("current member", e.info)) e.notifyLeader(ctx, nil) // need to re-campaign oldLeaderID = "" - - cancel2() - campaignWg.Wait() } } diff --git a/pkg/election/election_test.go b/pkg/election/election_test.go index a0db4b2b98..1a0f70b10d 100644 --- a/pkg/election/election_test.go +++ b/pkg/election/election_test.go @@ -167,7 +167,7 @@ func testElection2After1(t *testElectionSuite, c *C, normalExit bool) { c.Assert(deleted, IsFalse) } else { // for abnormally exited election, session will be cleared here - c.Assert(deleted, IsTrue) + // or cleared by `watchLeader` triggered by context cancel in `e1.Close()` after #1214. so we remove test here } // e2 should become the leader @@ -473,9 +473,8 @@ func (t *testElectionSuite) TestCancelCtxWontBlock(c *C) { ch2 := elec2.Observe(ctx) select { case info = <-ch2: - // should not observe old election-key c.Log(info) - c.Fail() + c.Fatal("should not observe old election-key") case <-time.After(timeout): } } From 10d4ec654bd107a336f0f01c5bc3a69803bd0a55 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sun, 25 Oct 2020 21:59:15 +0800 Subject: [PATCH 5/8] fix CI --- pkg/election/election.go | 2 +- pkg/election/election_test.go | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/election/election.go b/pkg/election/election.go index 3da97ec882..2b6013c7ab 100644 --- a/pkg/election/election.go +++ b/pkg/election/election.go @@ -321,8 +321,8 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio } // make sure `elec.Campaign` was exited (instead of rely on `<-eleObserveCh`), to avoid data race writing `elec`'s member - cancel2() campaignWg.Wait() + cancel2() e.l.Info("become leader", zap.Stringer("current member", e.info)) e.notifyLeader(ctx, leaderInfo) // become the leader now diff --git a/pkg/election/election_test.go b/pkg/election/election_test.go index 1a0f70b10d..3450667376 100644 --- a/pkg/election/election_test.go +++ b/pkg/election/election_test.go @@ -165,10 +165,9 @@ func testElection2After1(t *testElectionSuite, c *C, normalExit bool) { if normalExit { // for normally exited election, session has already been closed before c.Assert(deleted, IsFalse) - } else { - // for abnormally exited election, session will be cleared here - // or cleared by `watchLeader` triggered by context cancel in `e1.Close()` after #1214. so we remove test here } + // for abnormally exited election, session will be cleared here + // or cleared by `watchLeader` triggered by context cancel in `e1.Close()` after #1214. so we remove test here // e2 should become the leader select { From c7ead526b4199963721811fa6ec057e44e85b4f4 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sun, 25 Oct 2020 22:10:21 +0800 Subject: [PATCH 6/8] fix CI --- pkg/election/election_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/election/election_test.go b/pkg/election/election_test.go index 3450667376..9511b2fe41 100644 --- a/pkg/election/election_test.go +++ b/pkg/election/election_test.go @@ -446,9 +446,7 @@ func (t *testElectionSuite) TestCancelCtxWontBlock(c *C) { c.Assert(err, IsNil) c.Assert(resp.Kvs, HasLen, 0) - go func() { - c.Assert(elec.Campaign(ctx2, e.key), IsNil) - }() + c.Assert(elec.Campaign(ctx2, e.key), IsNil) ch := elec.Observe(ctx2) info := <-ch leaderKey := string(info.Kvs[0].Key) From 9e25d980a893d10175a268479e118ea77fae3217 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 27 Oct 2020 09:54:59 +0800 Subject: [PATCH 7/8] address comment --- pkg/election/election.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/election/election.go b/pkg/election/election.go index 2b6013c7ab..d33d7c0662 100644 --- a/pkg/election/election.go +++ b/pkg/election/election.go @@ -361,12 +361,12 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session e.campaignMu.Unlock() defer func() { - e.l.Info("will try resign leader") + e.l.Debug("will try resign leader") timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Duration(e.sessionTTL)*time.Second) if err := elec.Resign(timeoutCtx); err != nil { e.l.Warn("fail to resign leader", zap.Stringer("current member", e.info), zap.Error(err)) } else { - e.l.Info("finish resign leader") + e.l.Debug("finish resign leader") } cancel() e.campaignMu.Lock() From cebda3213b3a4eb79319eb0f3247e52b1816e35a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 27 Oct 2020 10:04:39 +0800 Subject: [PATCH 8/8] modify comment --- pkg/election/election_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/election/election_test.go b/pkg/election/election_test.go index 9511b2fe41..f68324f633 100644 --- a/pkg/election/election_test.go +++ b/pkg/election/election_test.go @@ -166,8 +166,7 @@ func testElection2After1(t *testElectionSuite, c *C, normalExit bool) { // for normally exited election, session has already been closed before c.Assert(deleted, IsFalse) } - // for abnormally exited election, session will be cleared here - // or cleared by `watchLeader` triggered by context cancel in `e1.Close()` after #1214. so we remove test here + // for abnormally exited election, session will be cleared by `watchLeader` in `e1.Close()` after #1214. // e2 should become the leader select {