Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/stathat/consistent v1.0.0
github.com/stretchr/testify v1.7.0
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
github.com/twmb/murmur3 v1.1.3
Expand All @@ -32,4 +33,5 @@ require (
go.uber.org/zap v1.20.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.43.0
stathat.com/c/consistent v1.0.0 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down Expand Up @@ -410,3 +412,5 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
4 changes: 4 additions & 0 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down Expand Up @@ -1400,3 +1402,5 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 h1:ucqkfpjg9WzSUubAO62csmucvxl4/JeW3F4I4909XkM=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:L5q+DGLGOQFpo1snNEkLOJT2d1YTW66rWNzatr3He1k=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
3 changes: 2 additions & 1 deletion internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
}

// TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request.
enableBatch := req.StoreTp != tikvrpc.TiDB && req.StoreTp != tikvrpc.TiFlash
// tiflash/tiflash_mpp/tidb don't use BatchCommand.
enableBatch := req.StoreTp == tikvrpc.TiKV
connArray, err := c.getConnArray(addr, enableBatch)
if err != nil {
return nil, err
Expand Down
147 changes: 139 additions & 8 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -52,6 +53,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/stathat/consistent"
"github.com/tikv/client-go/v2/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/client"
Expand All @@ -67,10 +69,12 @@ import (
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -371,6 +375,11 @@ type RegionCache struct {
sync.RWMutex
stores map[uint64]*Store
}
tiflashMPPStoreMu struct {
sync.RWMutex
needReload bool
stores []*Store
}
notifyCheckCh chan struct{}
closeCh chan struct{}

Expand All @@ -390,6 +399,8 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.mu.latestVersions = make(map[uint64]RegionVerID)
c.mu.sorted = btree.New(btreeDegree)
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashMPPStoreMu.needReload = true
c.tiflashMPPStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.closeCh = make(chan struct{})
interval := config.GetGlobalConfig().StoresRefreshInterval
Expand Down Expand Up @@ -728,6 +739,59 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
return nil, nil
}

// GetTiFlashMPPRPCContextByConsistentHash return rpcCtx of tiflash_mpp stores.
// Each mpp computation of specific region will be handled by specific tiflash_mpp node.
// 1. Get all stores with label <engine, tiflash_mpp>.
// 2. Get rpcCtx that indicates where the region is stored.
// 3. Compute which tiflash_mpp node should handle this region by consistent hash.
// 4. Replace infos(addr/Store) that indicate where the region is stored to infos that indicate where the region will be computed.
// NOTE: This function make sure the returned slice of RPCContext and the input ids correspond to each other.
func (c *RegionCache) GetTiFlashMPPRPCContextByConsistentHash(bo *retry.Backoffer, ids []RegionVerID) (res []*RPCContext, err error) {
mppStores, err := c.GetTiFlashMPPStores(bo)
if err != nil {
return nil, err
}
if len(mppStores) == 0 {
return nil, errors.New("Number of tiflash_mpp node is zero")
}

hasher := consistent.New()
for _, store := range mppStores {
hasher.Add(store.GetAddr())
}

for _, id := range ids {
addr, err := hasher.Get(strconv.Itoa(int(id.GetID())))
if err != nil {
return nil, err
}
rpcCtx, err := c.GetTiFlashRPCContext(bo, id, true)
if err != nil {
return nil, err
}
if rpcCtx == nil {
return nil, nil
}

var store *Store
for _, s := range mppStores {
if s.GetAddr() == addr {
store = s
break
}
}
if store == nil {
return nil, errors.New(fmt.Sprintf("cannot find mpp store: %v", addr))
}

rpcCtx.Store = store
rpcCtx.Addr = addr
// Maybe no need to replace rpcCtx.AccessMode, it's only used for loadBalance when access storeIdx.
res = append(res, rpcCtx)
}
return res, nil
}

// KeyLocation is the region and range that a key is located.
type KeyLocation struct {
Region RegionVerID
Expand Down Expand Up @@ -1743,6 +1807,70 @@ func (c *RegionCache) GetTiFlashStores() []*Store {
return stores
}

// GetTiFlashMPPStores returns all stores with lable <engine, tiflash_mpp>.
func (c *RegionCache) GetTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, err error) {
c.tiflashMPPStoreMu.RLock()
needReload := c.tiflashMPPStoreMu.needReload
stores := c.tiflashMPPStoreMu.stores
c.tiflashMPPStoreMu.RUnlock()

if needReload {
Comment thread
guo-shaoge marked this conversation as resolved.
return c.reloadTiFlashMPPStores(bo)
}
return stores, nil
}

func (c *RegionCache) reloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, _ error) {
stores, err := c.pdClient.GetAllStores(bo.GetCtx())
if err != nil {
return nil, err
}
for _, s := range stores {
if isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashMPP) {
res = append(res, &Store{
storeID: s.GetId(),
addr: s.GetAddress(),
saddr: s.GetStatusAddress(),
storeType: tikvrpc.GetStoreTypeByMeta(s),
labels: s.GetLabels(),
state: uint64(resolved),
})
}
}

c.tiflashMPPStoreMu.Lock()
c.tiflashMPPStoreMu.stores = res
c.tiflashMPPStoreMu.Unlock()
return res, nil
}

// InvalidateTiFlashMPPStoresIfGRPCError will invalid cache if is GRPC error.
// For now, only consider GRPC unavailable error.
func (c *RegionCache) InvalidateTiFlashMPPStoresIfGRPCError(err error) bool {
var invalidate bool
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.Unavailable:
invalidate = true
default:
}
}
if !invalidate {
return false
}

c.InvalidateTiFlashMPPStores()
return true
}

// InvalidateTiFlashMPPStores set needReload be true,
// and will refresh tiflash_mpp store cache next time.
func (c *RegionCache) InvalidateTiFlashMPPStores() {
c.tiflashMPPStoreMu.Lock()
defer c.tiflashMPPStoreMu.Unlock()
c.tiflashMPPStoreMu.needReload = true
}

// UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if
// the latestBucketsVer is newer than the cached one.
func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsVer uint64) {
Expand Down Expand Up @@ -2217,20 +2345,23 @@ func (s *Store) IsLabelsMatch(labels []*metapb.StoreLabel) bool {
return true
}
for _, targetLabel := range labels {
match := false
for _, label := range s.labels {
if targetLabel.Key == label.Key && targetLabel.Value == label.Value {
match = true
break
}
}
if !match {
if !isStoreContainLabel(s.labels, targetLabel.Key, targetLabel.Value) {
return false
}
}
return true
}

func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (res bool) {
for _, label := range labels {
if label.GetKey() == key && label.GetValue() == val {
res = true
break
}
}
return res
}

type livenessState uint32

var (
Expand Down
19 changes: 16 additions & 3 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,17 @@ func (s *RegionRequestSender) getRPCContext(
return s.regionCache.GetTiFlashRPCContext(bo, regionID, true)
case tikvrpc.TiDB:
return &RPCContext{Addr: s.storeAddr}, nil
case tikvrpc.TiFlashMPP:
rpcCtxs, err := s.regionCache.GetTiFlashMPPRPCContextByConsistentHash(bo, []RegionVerID{regionID})
if err != nil {
return nil, err
}
if rpcCtxs == nil {
return nil, nil
} else if len(rpcCtxs) != 1 {
return nil, errors.New(fmt.Sprintf("unexpected number of rpcCtx, expect 1, got: %v", len(rpcCtxs)))
}
return rpcCtxs[0], nil
default:
return nil, errors.Errorf("unsupported storage type: %v", et)
}
Expand Down Expand Up @@ -1271,7 +1282,9 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e
}
}

if ctx.Meta != nil {
if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashMPP {
s.regionCache.InvalidateTiFlashMPPStoresIfGRPCError(err)
} else if ctx.Meta != nil {
if s.replicaSelector != nil {
s.replicaSelector.onSendFailure(bo, err)
} else {
Expand All @@ -1283,7 +1296,7 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash {
if ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() {
err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx))
} else {
err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx))
Expand Down Expand Up @@ -1434,7 +1447,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx))
if ctx != nil && ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash {
if ctx != nil && ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() {
err = bo.Backoff(retry.BoTiFlashServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
} else {
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
Expand Down
6 changes: 3 additions & 3 deletions internal/mockstore/mocktikv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (s *Session) CheckRequestContext(ctx *kvrpcpb.Context) *errorpb.Error {
}
}
// The Peer on the Store is not leader. If it's tiflash store , we pass this check.
if storePeer.GetId() != leaderPeer.GetId() && !isTiFlashStore(s.cluster.GetStore(storePeer.GetStoreId())) {
if storePeer.GetId() != leaderPeer.GetId() && !isTiFlashRelatedStore(s.cluster.GetStore(storePeer.GetStoreId())) {
return &errorpb.Error{
Message: *proto.String("not leader"),
NotLeader: &errorpb.NotLeader{
Expand Down Expand Up @@ -182,9 +182,9 @@ func (s *Session) checkKeyInRegion(key []byte) bool {
return regionContains(s.startKey, s.endKey, NewMvccKey(key))
}

func isTiFlashStore(store *metapb.Store) bool {
func isTiFlashRelatedStore(store *metapb.Store) bool {
for _, l := range store.GetLabels() {
if l.GetKey() == "engine" && l.GetValue() == "tiflash" {
if l.GetKey() == "engine" && (l.GetValue() == "tiflash" || l.GetValue() == "tiflash_mpp") {
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion tikv/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (s *KVStore) listStoresForUnsafeDestory(ctx context.Context) ([]*metapb.Sto
if store.State == metapb.StoreState_Tombstone {
continue
}
if tikvrpc.GetStoreTypeByMeta(store) == tikvrpc.TiFlash {
if tikvrpc.GetStoreTypeByMeta(store).IsTiFlashRelatedType() {
continue
}
upStores = append(upStores, store)
Expand Down
18 changes: 15 additions & 3 deletions tikvrpc/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
TiKV EndpointType = iota
TiFlash
TiDB
TiFlashMPP
)

// Name returns the name of endpoint type.
Expand All @@ -55,23 +56,34 @@ func (t EndpointType) Name() string {
return "tiflash"
case TiDB:
return "tidb"
case TiFlashMPP:
return "tiflash_mpp"
}
return "unspecified"
}

// IsTiFlashRelatedType return true if it's tiflash or tiflash_mpp.
func (t EndpointType) IsTiFlashRelatedType() bool {
return t == TiFlash || t == TiFlashMPP
}

// Constants to determine engine type.
// They should be synced with PD.
const (
engineLabelKey = "engine"
engineLabelTiFlash = "tiflash"
EngineLabelKey = "engine"
EngineLabelTiFlash = "tiflash"
EngineLabelTiFlashMPP = "tiflash_mpp"
)

// GetStoreTypeByMeta gets store type by store meta pb.
func GetStoreTypeByMeta(store *metapb.Store) EndpointType {
for _, label := range store.Labels {
if label.Key == engineLabelKey && label.Value == engineLabelTiFlash {
if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlash {
return TiFlash
}
if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlashMPP {
return TiFlashMPP
}
}
return TiKV
}