From cb1d809a63a372317c780c7550b9be202b8d5ab5 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 17 Jul 2019 22:59:03 +0800 Subject: [PATCH 1/5] store/tikv: reduce the lock contend between sending and re-creating streaming client --- store/tikv/client_batch.go | 72 +++++++++++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 17 deletions(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index f959deafa55bf..b2c4bd52d3b7e 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -150,6 +150,37 @@ func fetchMorePendingRequests( } } +type tryLock struct { + sync.RWMutex + reCreating bool +} + +func (l *tryLock) tryLockForSend() bool { + l.RLock() + if l.reCreating { + l.RUnlock() + return false + } + return true +} + +func (l *tryLock) unlockForSend() { + l.RUnlock() +} + +func (l *tryLock) lockForRecreate() { + l.Lock() + l.reCreating = true + l.Unlock() + +} + +func (l *tryLock) unlockForRecreate() { + l.Lock() + l.reCreating = false + l.Unlock() +} + type batchCommandsClient struct { // The target host. target string @@ -162,8 +193,8 @@ type batchCommandsClient struct { // closed indicates the batch client is closed explicitly or not. closed int32 - // clientLock protects client when re-create the streaming. - clientLock sync.Mutex + // tryLock protects client when re-create the streaming. + tryLock } func (c *batchCommandsClient) isStopped() bool { @@ -171,10 +202,6 @@ func (c *batchCommandsClient) isStopped() bool { } func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) { - // Use the lock to protect the stream client won't be replaced by RecvLoop, - // and new added request won't be removed by `failPendingRequests`. - c.clientLock.Lock() - defer c.clientLock.Unlock() for i, requestID := range request.RequestIds { c.batched.Store(requestID, entries[i]) } @@ -210,9 +237,6 @@ func (c *batchCommandsClient) failPendingRequests(err error) { } func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { - // Hold the lock to forbid batchSendLoop using the old client. - c.clientLock.Lock() - defer c.clientLock.Unlock() c.failPendingRequests(err) // fail all pending requests. // Re-establish a application layer stream. TCP layer is handled by gRPC. @@ -224,6 +248,7 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { zap.String("target", c.target), ) c.client = streamClient + return true } logutil.BgLogger().Error( @@ -250,8 +275,11 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { resp, err := c.recv() if err != nil { now := time.Now() + // Forbids the batchSendLoop using the old client. + c.lockForRecreate() for { // try to re-create the streaming in the loop. if c.isStopped() { + c.unlockForRecreate() return } logutil.BgLogger().Error( @@ -267,6 +295,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { // TODO: Use a more smart backoff strategy. time.Sleep(time.Second) } + c.unlockForRecreate() metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds()) continue } @@ -327,12 +356,9 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { requests := make([]*tikvpb.BatchCommandsRequest_Request, 0, cfg.MaxBatchSize) requestIDs := make([]uint64, 0, cfg.MaxBatchSize) + var tikvTransportLayerLoad uint64 var bestBatchWaitSize = cfg.BatchWaitSize for { - // Choose a connection by round-robbin. - next := atomic.AddUint32(&a.index, 1) % uint32(len(a.batchCommandsClients)) - batchCommandsClient := a.batchCommandsClients[next] - entries = entries[:0] requests = requests[:0] requestIDs = requestIDs[:0] @@ -341,7 +367,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests) if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { - tikvTransportLayerLoad := atomic.LoadUint64(batchCommandsClient.tikvTransportLayerLoad) // If the target TiKV is overload, wait a while to collect more requests. if uint(tikvTransportLayerLoad) >= cfg.OverloadThreshold { fetchMorePendingRequests( @@ -365,18 +390,31 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { if length == 0 { continue // All requests are canceled. } - maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length)) + + // Choose a connection by round-robbin. + var cli *batchCommandsClient + for { + a.index = (a.index + 1) % uint32(len(a.batchCommandsClients)) + cli = a.batchCommandsClients[a.index] + // The lock protects the batchCommandsClient from been closed while it's inuse. + if cli.tryLockForSend() { + break + } + } + + maxBatchID := atomic.AddUint64(&cli.idAlloc, uint64(length)) for i := 0; i < length; i++ { requestID := uint64(i) + maxBatchID - uint64(length) requestIDs = append(requestIDs, requestID) } - req := &tikvpb.BatchCommandsRequest{ Requests: requests, RequestIds: requestIDs, } - batchCommandsClient.send(req, entries) + cli.send(req, entries) + tikvTransportLayerLoad = atomic.LoadUint64(cli.tikvTransportLayerLoad) + cli.unlockForSend() } } From 5e83eb287fe0a186e9ae77fb236077889ee8bbfe Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 18 Jul 2019 16:42:58 +0800 Subject: [PATCH 2/5] address comment --- store/tikv/client_batch.go | 114 ++++++++++++++++++++----------------- store/tikv/client_test.go | 4 +- 2 files changed, 63 insertions(+), 55 deletions(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index b2c4bd52d3b7e..d332121ca6b3f 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -236,7 +236,7 @@ func (c *batchCommandsClient) failPendingRequests(err error) { }) } -func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { +func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) bool { c.failPendingRequests(err) // fail all pending requests. // Re-establish a application layer stream. TCP layer is handled by gRPC. @@ -275,27 +275,9 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { resp, err := c.recv() if err != nil { now := time.Now() - // Forbids the batchSendLoop using the old client. - c.lockForRecreate() - for { // try to re-create the streaming in the loop. - if c.isStopped() { - c.unlockForRecreate() - return - } - logutil.BgLogger().Error( - "batchRecvLoop error when receive", - zap.String("target", c.target), - zap.Error(err), - ) - - if c.reCreateStreamingClient(err) { - break - } - - // TODO: Use a more smart backoff strategy. - time.Sleep(time.Second) + if stopped := c.reCreateStreamingClient(err); stopped { + return } - c.unlockForRecreate() metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds()) continue } @@ -325,6 +307,31 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { } } +func (c *batchCommandsClient) reCreateStreamingClient(err error) (stopped bool) { + // Forbids the batchSendLoop using the old client. + c.lockForRecreate() + defer c.unlockForRecreate() + for { // try to re-create the streaming in the loop. + if c.isStopped() { + c.unlockForRecreate() + return true + } + logutil.BgLogger().Error( + "batchRecvLoop error when receive", + zap.String("target", c.target), + zap.Error(err), + ) + + if c.reCreateStreamingClientOnce(err) { + break + } + + // TODO: Use a more smart backoff strategy. + time.Sleep(time.Second) + } + return false +} + type batchCommandsEntry struct { req *tikvpb.BatchCommandsRequest_Request res chan *tikvpb.BatchCommandsResponse_Response @@ -386,36 +393,40 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { bestBatchWaitSize += 1 } - length = removeCanceledRequests(&entries, &requests) - if length == 0 { + entries, requests = removeCanceledRequests(entries, requests) + if len(entries) == 0 { continue // All requests are canceled. } - // Choose a connection by round-robbin. - var cli *batchCommandsClient - for { - a.index = (a.index + 1) % uint32(len(a.batchCommandsClients)) - cli = a.batchCommandsClients[a.index] - // The lock protects the batchCommandsClient from been closed while it's inuse. - if cli.tryLockForSend() { - break - } - } + tikvTransportLayerLoad = a.getClientAndSend(entries, requests, requestIDs) + } +} - maxBatchID := atomic.AddUint64(&cli.idAlloc, uint64(length)) - for i := 0; i < length; i++ { - requestID := uint64(i) + maxBatchID - uint64(length) - requestIDs = append(requestIDs, requestID) - } - req := &tikvpb.BatchCommandsRequest{ - Requests: requests, - RequestIds: requestIDs, +func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*tikvpb.BatchCommandsRequest_Request, requestIDs []uint64) (tikvTransportLayerLoad uint64) { + // Choose a connection by round-robbin. + var cli *batchCommandsClient + for { + a.index = (a.index + 1) % uint32(len(a.batchCommandsClients)) + cli = a.batchCommandsClients[a.index] + // The lock protects the batchCommandsClient from been closed while it's inuse. + if cli.tryLockForSend() { + break } + } + defer cli.unlockForSend() - cli.send(req, entries) - tikvTransportLayerLoad = atomic.LoadUint64(cli.tikvTransportLayerLoad) - cli.unlockForSend() + maxBatchID := atomic.AddUint64(&cli.idAlloc, uint64(len(requests))) + for i := 0; i < len(requests); i++ { + requestID := uint64(i) + maxBatchID - uint64(len(requests)) + requestIDs = append(requestIDs, requestID) } + req := &tikvpb.BatchCommandsRequest{ + Requests: requests, + RequestIds: requestIDs, + } + + cli.send(req, entries) + return atomic.LoadUint64(cli.tikvTransportLayerLoad) } func (a *batchConn) Close() { @@ -431,20 +442,17 @@ func (a *batchConn) Close() { } // removeCanceledRequests removes canceled requests before sending. -func removeCanceledRequests( - entries *[]*batchCommandsEntry, - requests *[]*tikvpb.BatchCommandsRequest_Request) int { - validEntries := (*entries)[:0] - validRequets := (*requests)[:0] - for _, e := range *entries { +func removeCanceledRequests(entries []*batchCommandsEntry, + requests []*tikvpb.BatchCommandsRequest_Request) ([]*batchCommandsEntry, []*tikvpb.BatchCommandsRequest_Request) { + validEntries := entries[:0] + validRequets := requests[:0] + for _, e := range entries { if !e.isCanceled() { validEntries = append(validEntries, e) validRequets = append(validRequets, e.req) } } - *entries = validEntries - *requests = validRequets - return len(*entries) + return validEntries, validRequets } func sendBatchRequest( diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index 5f685534e40ba..4871a78afa557 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -76,8 +76,8 @@ func (s *testClientSuite) TestRemoveCanceledRequests(c *C) { for i := range entries { requests[i] = entries[i].req } - length := removeCanceledRequests(&entries, &requests) - c.Assert(length, Equals, 2) + entries, requests = removeCanceledRequests(entries, requests) + c.Assert(len(entries), Equals, 2) for _, e := range entries { c.Assert(e.isCanceled(), IsFalse) } From d61e65179cd8a41b668521c6badd4ad47a8e0531 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 18 Jul 2019 16:50:40 +0800 Subject: [PATCH 3/5] address comment --- store/tikv/client_batch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index d332121ca6b3f..34c87addd5eed 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -311,9 +311,9 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) (stopped bool) // Forbids the batchSendLoop using the old client. c.lockForRecreate() defer c.unlockForRecreate() + for { // try to re-create the streaming in the loop. if c.isStopped() { - c.unlockForRecreate() return true } logutil.BgLogger().Error( From 137ecd30c4c79194580628411fd109ca7ab9b800 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 22 Jul 2019 16:33:07 +0800 Subject: [PATCH 4/5] address comment --- store/tikv/client.go | 15 +++++++-------- store/tikv/client_batch.go | 21 ++++++++++----------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index 3278a2ebb9fd8..4f3049236ee4f 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -152,16 +152,15 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint return errors.Trace(err) } batchClient := &batchCommandsClient{ - target: a.target, - conn: conn, - client: streamClient, - batched: sync.Map{}, - idAlloc: 0, - tikvTransportLayerLoad: &a.tikvTransportLayerLoad, - closed: 0, + target: a.target, + conn: conn, + client: streamClient, + batched: sync.Map{}, + idAlloc: 0, + closed: 0, } a.batchCommandsClients = append(a.batchCommandsClients, batchClient) - go batchClient.batchRecvLoop(cfg.TiKVClient) + go batchClient.batchRecvLoop(cfg.TiKVClient, &a.tikvTransportLayerLoad) } } go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index 34c87addd5eed..fa793a57a4388 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -185,11 +185,10 @@ type batchCommandsClient struct { // The target host. target string - conn *grpc.ClientConn - client tikvpb.Tikv_BatchCommandsClient - batched sync.Map - idAlloc uint64 - tikvTransportLayerLoad *uint64 + conn *grpc.ClientConn + client tikvpb.Tikv_BatchCommandsClient + batched sync.Map + idAlloc uint64 // closed indicates the batch client is closed explicitly or not. closed int32 @@ -259,7 +258,7 @@ func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) bool { return false } -func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { +func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransportLayerLoad *uint64) { defer func() { if r := recover(); r != nil { metrics.PanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc() @@ -267,7 +266,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { zap.Reflect("r", r), zap.Stack("stack")) logutil.BgLogger().Info("restart batchRecvLoop") - go c.batchRecvLoop(cfg) + go c.batchRecvLoop(cfg, tikvTransportLayerLoad) } }() @@ -299,10 +298,10 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { c.batched.Delete(requestID) } - tikvTransportLayerLoad := resp.GetTransportLayerLoad() - if tikvTransportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 { + transportLayerLoad := resp.GetTransportLayerLoad() + if transportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 { // We need to consider TiKV load only if batch-wait strategy is enabled. - atomic.StoreUint64(c.tikvTransportLayerLoad, tikvTransportLayerLoad) + atomic.StoreUint64(tikvTransportLayerLoad, transportLayerLoad) } } } @@ -426,7 +425,7 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []* } cli.send(req, entries) - return atomic.LoadUint64(cli.tikvTransportLayerLoad) + return atomic.LoadUint64(&a.tikvTransportLayerLoad) } func (a *batchConn) Close() { From bbe7d7a3c082485a5f55fc2ab8f26e511ab5942c Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 22 Jul 2019 17:02:00 +0800 Subject: [PATCH 5/5] address comment --- store/tikv/client_batch.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index a1550e0232a81..bb4da4a8b71e3 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -368,7 +368,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { requests := make([]*tikvpb.BatchCommandsRequest_Request, 0, cfg.MaxBatchSize) requestIDs := make([]uint64, 0, cfg.MaxBatchSize) - var tikvTransportLayerLoad uint64 var bestBatchWaitSize = cfg.BatchWaitSize for { entries = entries[:0] @@ -380,7 +379,7 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { // If the target TiKV is overload, wait a while to collect more requests. - if uint(tikvTransportLayerLoad) >= cfg.OverloadThreshold { + if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) { fetchMorePendingRequests( a.batchCommandsCh, int(cfg.MaxBatchSize), int(bestBatchWaitSize), cfg.MaxBatchWaitTime, &entries, &requests, @@ -403,11 +402,11 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { continue // All requests are canceled. } - tikvTransportLayerLoad = a.getClientAndSend(entries, requests, requestIDs) + a.getClientAndSend(entries, requests, requestIDs) } } -func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*tikvpb.BatchCommandsRequest_Request, requestIDs []uint64) (tikvTransportLayerLoad uint64) { +func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*tikvpb.BatchCommandsRequest_Request, requestIDs []uint64) { // Choose a connection by round-robbin. var cli *batchCommandsClient for { @@ -431,7 +430,7 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []* } cli.send(req, entries) - return atomic.LoadUint64(&a.tikvTransportLayerLoad) + return } func (a *batchConn) Close() {