Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,30 @@ require (
github.com/fsouza/fake-gcs-server v1.15.0
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.3.1
github.com/golang/snappy v0.0.1 // indirect
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.1
github.com/onsi/ginkgo v1.10.3 // indirect
github.com/onsi/gomega v1.7.1 // indirect
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4
github.com/pingcap/errors v0.11.4
github.com/pingcap/kvproto v0.0.0-20191212073621-373b0fec09a1
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6
github.com/pingcap/pd v1.1.0-beta.0.20191115131715-6b7dc037010e
github.com/pingcap/tidb v1.1.0-beta.0.20191205065313-6083b21f986b
github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191210060830-bdf23a7ade01
github.com/pingcap/pd v1.1.0-beta.0.20191212045800-234784c7a9c5
github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834
github.com/pingcap/tidb-tools v3.0.8-0.20191209062450-c67149676f5c+incompatible
github.com/pingcap/tipb v0.0.0-20191126033718-169898888b24
github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33
github.com/prometheus/client_golang v1.0.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.3
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 // indirect
go.opencensus.io v0.22.2 // indirect
go.uber.org/zap v1.10.0
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect
go.uber.org/atomic v1.5.1 // indirect
go.uber.org/zap v1.13.0
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect
golang.org/x/net v0.0.0-20191011234655-491137f69257 // indirect
golang.org/x/tools v0.0.0-20191213032237-7093a17b0467 // indirect
google.golang.org/api v0.14.0
google.golang.org/grpc v1.24.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
google.golang.org/grpc v1.25.1
)
116 changes: 81 additions & 35 deletions go.sum

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ func (bc *Client) fineGrainedBackup(

func onBackupResponse(
bo *tikv.Backoffer,
backupTS uint64,
lockResolver *tikv.LockResolver,
resp *backup.BackupResponse,
) (*backup.BackupResponse, int, error) {
Expand All @@ -546,8 +547,8 @@ func onBackupResponse(
if lockErr := v.KvError.Locked; lockErr != nil {
// Try to resolve lock.
log.Warn("backup occur kv error", zap.Reflect("error", v))
msBeforeExpired, err1 := lockResolver.ResolveLocks(
bo, []*tikv.Lock{tikv.NewLock(lockErr)})
msBeforeExpired, _, err1 := lockResolver.ResolveLocks(
bo, backupTS, []*tikv.Lock{tikv.NewLock(lockErr)})
if err1 != nil {
return nil, 0, errors.Trace(err1)
}
Expand Down Expand Up @@ -627,7 +628,7 @@ func (bc *Client) handleFineGrained(
// Handle responses with the same backoffer.
func(resp *backup.BackupResponse) error {
response, backoffMs, err1 :=
onBackupResponse(bo, lockResolver, resp)
onBackupResponse(bo, backupTS, lockResolver, resp)
if err1 != nil {
return err1
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func buildTableRequest(
}

checksum := &tipb.ChecksumRequest{
StartTs: startTS,
ScanOn: tipb.ChecksumScanOn_Table,
Algorithm: tipb.ChecksumAlgorithm_Crc64_Xor,
Rule: rule,
Expand All @@ -147,6 +146,7 @@ func buildTableRequest(
// Use low priority to reducing impact to other requests.
builder.Request.Priority = kv.PriorityLow
return builder.SetTableRanges(tableID, ranges, nil).
SetStartTS(startTS).
SetChecksumRequest(checksum).
SetConcurrency(variable.DefDistSQLScanConcurrency).
Build()
Expand All @@ -167,7 +167,6 @@ func buildIndexRequest(
}
}
checksum := &tipb.ChecksumRequest{
StartTs: startTS,
ScanOn: tipb.ChecksumScanOn_Index,
Algorithm: tipb.ChecksumAlgorithm_Crc64_Xor,
Rule: rule,
Expand All @@ -179,6 +178,7 @@ func buildIndexRequest(
// Use low priority to reducing impact to other requests.
builder.Request.Priority = kv.PriorityLow
return builder.SetIndexRanges(nil, tableID, indexInfo.ID, ranges).
SetStartTS(startTS).
SetChecksumRequest(checksum).
SetConcurrency(variable.DefDistSQLScanConcurrency).
Build()
Expand Down
5 changes: 4 additions & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/store/tikv"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -206,11 +207,13 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
keepAlive := 10
keepAliveTimeout := 3
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = time.Second * 3
conn, err := grpc.DialContext(
ctx,
store.GetAddress(),
opt,
grpc.WithBackoffMaxDelay(time.Second*3),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepAlive) * time.Second,
Timeout: time.Duration(keepAliveTimeout) * time.Second,
Expand Down
4 changes: 0 additions & 4 deletions pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ func TestT(t *testing.T) {
TestingT(t)
}

func TestClient(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testClientSuite{})

type testClientSuite struct {
Expand Down
5 changes: 4 additions & 1 deletion pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/checksum"
Expand Down Expand Up @@ -396,6 +397,8 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo
if err != nil {
return errors.Trace(err)
}
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = time.Second * 3
for _, store := range stores {
opt := grpc.WithInsecure()
gctx, cancel := context.WithTimeout(ctx, time.Second*5)
Expand All @@ -405,7 +408,7 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo
gctx,
store.GetAddress(),
opt,
grpc.WithBackoffMaxDelay(time.Second*3),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepAlive) * time.Second,
Timeout: time.Duration(keepAliveTimeout) * time.Second,
Expand Down
16 changes: 7 additions & 9 deletions tests/br_key_locked/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,13 @@ func (c *Locker) lockBatch(ctx context.Context, keys [][]byte, primary []byte) (
return 0, nil
}

req := &tikvrpc.Request{
Type: tikvrpc.CmdPrewrite,
Prewrite: &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: primary,
StartVersion: startTs,
LockTtl: uint64(c.lockTTL.Milliseconds()),
},
prewrite := &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: primary,
StartVersion: startTs,
LockTtl: uint64(c.lockTTL.Milliseconds()),
}
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, prewrite)

// Send the requests
resp, err := c.kv.SendReq(bo, req, loc.Region, time.Second*20)
Expand All @@ -290,7 +288,7 @@ func (c *Locker) lockBatch(ctx context.Context, keys [][]byte, primary []byte) (
continue
}

prewriteResp := resp.Prewrite
prewriteResp := resp.Resp
if prewriteResp == nil {
return 0, errors.Errorf("response body missing")
}
Expand Down