diff --git a/pkg/election/election.go b/pkg/election/election.go index ff530aca98..d33d7c0662 100644 --- a/pkg/election/election.go +++ b/pkg/election/election.go @@ -320,15 +320,16 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio continue } + // make sure `elec.Campaign` was exited (instead of rely on `<-eleObserveCh`), to avoid data race writing `elec`'s member + campaignWg.Wait() + cancel2() + e.l.Info("become leader", zap.Stringer("current member", e.info)) e.notifyLeader(ctx, leaderInfo) // become the leader now e.watchLeader(ctx, session, leaderKey, elec) e.l.Info("retire from leader", zap.Stringer("current member", e.info)) e.notifyLeader(ctx, nil) // need to re-campaign oldLeaderID = "" - - cancel2() - campaignWg.Wait() } } @@ -351,6 +352,7 @@ func (e *Election) notifyLeader(ctx context.Context, leaderInfo *CampaignerInfo) } } +// watchLeader should call `elec.Resign` when exit, to remove election-key func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election) { e.l.Debug("watch leader key", zap.String("key", key)) @@ -359,6 +361,14 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session e.campaignMu.Unlock() defer func() { + e.l.Debug("will try resign leader") + timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Duration(e.sessionTTL)*time.Second) + if err := elec.Resign(timeoutCtx); err != nil { + e.l.Warn("fail to resign leader", zap.Stringer("current member", e.info), zap.Error(err)) + } else { + e.l.Debug("finish resign leader") + } + cancel() e.campaignMu.Lock() e.resignCh = nil e.campaignMu.Unlock() @@ -368,9 +378,6 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session for { if e.evictLeader.Get() == 1 { - if err := elec.Resign(ctx); err != nil { - e.l.Info("fail to resign leader", zap.Stringer("current member", e.info), zap.Error(err)) - } return } @@ -397,9 +404,6 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session case <-ctx.Done(): return case <-e.resignCh: - if err := elec.Resign(ctx); err != nil { - e.l.Info("fail to resign leader", zap.Stringer("current member", e.info), zap.Error(err)) - } return } } diff --git a/pkg/election/election_test.go b/pkg/election/election_test.go index 29061aaf8c..f68324f633 100644 --- a/pkg/election/election_test.go +++ b/pkg/election/election_test.go @@ -25,7 +25,9 @@ import ( "github.com/pingcap/failpoint" "github.com/tikv/pd/pkg/tempurl" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/concurrency" "go.etcd.io/etcd/embed" + "go.uber.org/zap" "github.com/pingcap/dm/pkg/etcdutil" "github.com/pingcap/dm/pkg/log" @@ -163,10 +165,8 @@ func testElection2After1(t *testElectionSuite, c *C, normalExit bool) { if normalExit { // for normally exited election, session has already been closed before c.Assert(deleted, IsFalse) - } else { - // for abnormally exited election, session will be cleared here - c.Assert(deleted, IsTrue) } + // for abnormally exited election, session will be cleared by `watchLeader` in `e1.Close()` after #1214. // e2 should become the leader select { @@ -402,3 +402,75 @@ func (t *testElectionSuite) TestElectionDeleteKey(c *C) { c.Assert(err, IsNil) wg.Wait() } + +func (t *testElectionSuite) TestCancelCtxWontBlock(c *C) { + var ( + wg sync.WaitGroup + timeout = 3 * time.Second + sessionTTL = 60 + key = "unit-test/election-key" + ID = "member" + addr = "127.0.0.1:1234" + ) + cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil) + c.Assert(err, IsNil) + defer cli.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // manually create Election, don't use `NewElection` to avoid `campaignLoop` + e := &Election{ + cli: cli, + sessionTTL: sessionTTL, + key: key, + info: &CampaignerInfo{ + ID: ID, + Addr: addr, + }, + notifyBlockTime: t.notifyBlockTime, + leaderCh: make(chan *CampaignerInfo, 1), + ech: make(chan error, 1), // size 1 is enough + cancel: func() {}, // no use, but avoid panic + l: log.With(zap.String("component", "election")), + } + e.infoStr = e.info.String() + defer e.Close() + + session, err := e.newSession(ctx, newSessionDefaultRetryCnt) + c.Assert(err, IsNil) + elec := concurrency.NewElection(session, e.key) + ctx2, cancel2 := context.WithCancel(ctx) + + resp, err := cli.Get(ctx, key, clientv3.WithPrefix()) + c.Assert(err, IsNil) + c.Assert(resp.Kvs, HasLen, 0) + + c.Assert(elec.Campaign(ctx2, e.key), IsNil) + ch := elec.Observe(ctx2) + info := <-ch + leaderKey := string(info.Kvs[0].Key) + + wg.Add(1) + go func() { + e.watchLeader(ctx2, session, leaderKey, elec) + wg.Done() + }() + + time.Sleep(100 * time.Millisecond) + cancel2() + + // watchLeader will clean election-key + wg.Wait() + resp, err = cli.Get(ctx, key, clientv3.WithPrefix()) + c.Assert(err, IsNil) + c.Assert(resp.Kvs, HasLen, 0) + + elec2 := concurrency.NewElection(session, e.key) + ch2 := elec2.Observe(ctx) + select { + case info = <-ch2: + c.Log(info) + c.Fatal("should not observe old election-key") + case <-time.After(timeout): + } +}