Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,15 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
return errors.Trace(err)
}
batchClient := &batchCommandsClient{
target: a.target,
conn: conn,
client: streamClient,
batched: sync.Map{},
idAlloc: 0,
tikvTransportLayerLoad: &a.tikvTransportLayerLoad,
closed: 0,
target: a.target,
conn: conn,
client: streamClient,
batched: sync.Map{},
idAlloc: 0,
closed: 0,
}
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
go batchClient.batchRecvLoop(cfg.TiKVClient)
go batchClient.batchRecvLoop(cfg.TiKVClient, &a.tikvTransportLayerLoad)
}
}
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout)
Expand Down
164 changes: 104 additions & 60 deletions store/tikv/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,31 +152,57 @@ func fetchMorePendingRequests(
}
}

type tryLock struct {
sync.RWMutex
reCreating bool
}

func (l *tryLock) tryLockForSend() bool {
l.RLock()
if l.reCreating {
l.RUnlock()
return false
}
return true
}

func (l *tryLock) unlockForSend() {
l.RUnlock()
}

func (l *tryLock) lockForRecreate() {
l.Lock()
l.reCreating = true
l.Unlock()

}

func (l *tryLock) unlockForRecreate() {
l.Lock()
l.reCreating = false
l.Unlock()
}

type batchCommandsClient struct {
// The target host.
target string

conn *grpc.ClientConn
client tikvpb.Tikv_BatchCommandsClient
batched sync.Map
idAlloc uint64
tikvTransportLayerLoad *uint64
conn *grpc.ClientConn
client tikvpb.Tikv_BatchCommandsClient
batched sync.Map
idAlloc uint64

// closed indicates the batch client is closed explicitly or not.
closed int32
// clientLock protects client when re-create the streaming.
clientLock sync.Mutex
// tryLock protects client when re-create the streaming.
tryLock
}

func (c *batchCommandsClient) isStopped() bool {
return atomic.LoadInt32(&c.closed) != 0
}

func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) {
// Use the lock to protect the stream client won't be replaced by RecvLoop,
// and new added request won't be removed by `failPendingRequests`.
c.clientLock.Lock()
defer c.clientLock.Unlock()
for i, requestID := range request.RequestIds {
c.batched.Store(requestID, entries[i])
}
Expand Down Expand Up @@ -211,10 +237,7 @@ func (c *batchCommandsClient) failPendingRequests(err error) {
})
}

func (c *batchCommandsClient) reCreateStreamingClient(err error) error {
// Hold the lock to forbid batchSendLoop using the old client.
c.clientLock.Lock()
defer c.clientLock.Unlock()
func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error {
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
Expand All @@ -226,6 +249,7 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) error {
zap.String("target", c.target),
)
c.client = streamClient
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this write becomes a write without hold "mutex", can other thread saw this change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reCreateStreamingClient acquire the write lock, so other send goroutine skips this batchCommandsClient and try the next one, in tryLockForSend @lysu

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is protected under lock already

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lockForRecreate set a flag to exclusive others to step into this, but it call mutex.Unlock after set flag, so this function can exclusive other access, but I still confuse its visibility 😕

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After lockForRecreate, the reCreate flag is set. tryLockForSend will detect the flag.
Maybe you can implement a RWMutex struct with a tryRlock API, and then you'll figure out it.


return nil
}
logutil.BgLogger().Error(
Expand All @@ -236,15 +260,15 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) error {
return err
}

func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransportLayerLoad *uint64) {
defer func() {
if r := recover(); r != nil {
metrics.PanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc()
logutil.BgLogger().Error("batchRecvLoop",
zap.Reflect("r", r),
zap.Stack("stack"))
logutil.BgLogger().Info("restart batchRecvLoop")
go c.batchRecvLoop(cfg)
go c.batchRecvLoop(cfg, tikvTransportLayerLoad)
}
}()

Expand All @@ -257,21 +281,9 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
zap.Error(err),
)

b := NewBackoffer(context.Background(), math.MaxInt32)
now := time.Now()
for { // try to re-create the streaming in the loop.
if c.isStopped() {
return
}

err1 := c.reCreateStreamingClient(err)
if err1 == nil {
break
}
err2 := b.Backoff(boTiKVRPC, err1)
// As timeout is set to math.MaxUint32, err2 should always be nil.
// This line is added to make the 'make errcheck' pass.
terror.Log(err2)
if stopped := c.reCreateStreamingClient(err); stopped {
return
}
metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds())
continue
Expand All @@ -294,14 +306,37 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
c.batched.Delete(requestID)
}

tikvTransportLayerLoad := resp.GetTransportLayerLoad()
if tikvTransportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 {
transportLayerLoad := resp.GetTransportLayerLoad()
if transportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 {
// We need to consider TiKV load only if batch-wait strategy is enabled.
atomic.StoreUint64(c.tikvTransportLayerLoad, tikvTransportLayerLoad)
atomic.StoreUint64(tikvTransportLayerLoad, transportLayerLoad)
}
}
}

func (c *batchCommandsClient) reCreateStreamingClient(err error) (stopped bool) {
// Forbids the batchSendLoop using the old client.
c.lockForRecreate()
defer c.unlockForRecreate()

b := NewBackoffer(context.Background(), math.MaxInt32)
for { // try to re-create the streaming in the loop.
if c.isStopped() {
return true
}
err1 := c.reCreateStreamingClientOnce(err)
if err1 == nil {
break
}

err2 := b.Backoff(boTiKVRPC, err1)
// As timeout is set to math.MaxUint32, err2 should always be nil.
// This line is added to make the 'make errcheck' pass.
terror.Log(err2)
}
return false
}

type batchCommandsEntry struct {
req *tikvpb.BatchCommandsRequest_Request
res chan *tikvpb.BatchCommandsResponse_Response
Expand Down Expand Up @@ -335,10 +370,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {

var bestBatchWaitSize = cfg.BatchWaitSize
for {
// Choose a connection by round-robbin.
next := atomic.AddUint32(&a.index, 1) % uint32(len(a.batchCommandsClients))
batchCommandsClient := a.batchCommandsClients[next]

entries = entries[:0]
requests = requests[:0]
requestIDs = requestIDs[:0]
Expand All @@ -347,9 +378,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests)

if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 {
tikvTransportLayerLoad := atomic.LoadUint64(batchCommandsClient.tikvTransportLayerLoad)
Comment thread
tiancaiamao marked this conversation as resolved.
// If the target TiKV is overload, wait a while to collect more requests.
if uint(tikvTransportLayerLoad) >= cfg.OverloadThreshold {
if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) {
fetchMorePendingRequests(
a.batchCommandsCh, int(cfg.MaxBatchSize), int(bestBatchWaitSize),
cfg.MaxBatchWaitTime, &entries, &requests,
Expand All @@ -367,23 +397,40 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
bestBatchWaitSize += 1
}

length = removeCanceledRequests(&entries, &requests)
if length == 0 {
entries, requests = removeCanceledRequests(entries, requests)
if len(entries) == 0 {
continue // All requests are canceled.
}
maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length))
for i := 0; i < length; i++ {
requestID := uint64(i) + maxBatchID - uint64(length)
requestIDs = append(requestIDs, requestID)
}

req := &tikvpb.BatchCommandsRequest{
Requests: requests,
RequestIds: requestIDs,
a.getClientAndSend(entries, requests, requestIDs)
}
}

func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*tikvpb.BatchCommandsRequest_Request, requestIDs []uint64) {
// Choose a connection by round-robbin.
var cli *batchCommandsClient
for {
a.index = (a.index + 1) % uint32(len(a.batchCommandsClients))
cli = a.batchCommandsClients[a.index]
// The lock protects the batchCommandsClient from been closed while it's inuse.
if cli.tryLockForSend() {
break
}
}
defer cli.unlockForSend()

batchCommandsClient.send(req, entries)
maxBatchID := atomic.AddUint64(&cli.idAlloc, uint64(len(requests)))
for i := 0; i < len(requests); i++ {
requestID := uint64(i) + maxBatchID - uint64(len(requests))
requestIDs = append(requestIDs, requestID)
}
req := &tikvpb.BatchCommandsRequest{
Requests: requests,
RequestIds: requestIDs,
}

cli.send(req, entries)
return
}

func (a *batchConn) Close() {
Expand All @@ -399,20 +446,17 @@ func (a *batchConn) Close() {
}

// removeCanceledRequests removes canceled requests before sending.
func removeCanceledRequests(
entries *[]*batchCommandsEntry,
requests *[]*tikvpb.BatchCommandsRequest_Request) int {
validEntries := (*entries)[:0]
validRequets := (*requests)[:0]
for _, e := range *entries {
func removeCanceledRequests(entries []*batchCommandsEntry,
requests []*tikvpb.BatchCommandsRequest_Request) ([]*batchCommandsEntry, []*tikvpb.BatchCommandsRequest_Request) {
validEntries := entries[:0]
validRequets := requests[:0]
for _, e := range entries {
if !e.isCanceled() {
validEntries = append(validEntries, e)
validRequets = append(validRequets, e.req)
}
}
*entries = validEntries
*requests = validRequets
return len(*entries)
return validEntries, validRequets
}

func sendBatchRequest(
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (s *testClientSuite) TestRemoveCanceledRequests(c *C) {
for i := range entries {
requests[i] = entries[i].req
}
length := removeCanceledRequests(&entries, &requests)
c.Assert(length, Equals, 2)
entries, requests = removeCanceledRequests(entries, requests)
c.Assert(len(entries), Equals, 2)
for _, e := range entries {
c.Assert(e.isCanceled(), IsFalse)
}
Expand Down