From 572c62d9ec4367b2b60558f9ffd932f6ac2dbd05 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 27 Apr 2022 20:50:03 +0800 Subject: [PATCH 01/14] support find stores with tiflash_mpp label Signed-off-by: guo-shaoge --- internal/locate/region_cache.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index d71c814e37..76086b51a1 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -1688,6 +1688,23 @@ func (c *RegionCache) GetTiFlashStores() []*Store { return stores } +// GetTiFlashMPPStores returns all stores with lable +func (c *RegionCache) GetTiFlashMPPStores() (res []*Store) { + allStores := c.GetTiFlashStores() + mppLabels := []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash_mpp", + }, + } + for _, store := range allStores { + if store.IsLabelsMatch(mppLabels) { + res = append(res, store) + } + } + return res +} + // UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if // the latestBucketsVer is newer than the cached one. func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsVer uint64) { From 125a5b13d4e295058ae29e9682fe06f13326bdee Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 28 Apr 2022 21:31:45 +0800 Subject: [PATCH 02/14] update impl of GetTiFlashMPPStores() Signed-off-by: guo-shaoge --- internal/locate/region_cache.go | 55 +++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 6 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 76086b51a1..87d515c008 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -369,6 +369,11 @@ type RegionCache struct { sync.RWMutex stores map[uint64]*Store } + tiflashMPPStoreMu struct { + sync.RWMutex + needReload bool + stores []*Store + } notifyCheckCh chan struct{} closeCh chan struct{} @@ -388,6 +393,8 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.mu.latestVersions = make(map[uint64]RegionVerID) c.mu.sorted = btree.New(btreeDegree) c.storeMu.stores = make(map[uint64]*Store) + c.tiflashMPPStoreMu.needReload = true + c.tiflashMPPStoreMu.stores = make([]*Store, 1) c.notifyCheckCh = make(chan struct{}, 1) c.closeCh = make(chan struct{}) interval := config.GetGlobalConfig().StoresRefreshInterval @@ -1689,20 +1696,56 @@ func (c *RegionCache) GetTiFlashStores() []*Store { } // GetTiFlashMPPStores returns all stores with lable -func (c *RegionCache) GetTiFlashMPPStores() (res []*Store) { - allStores := c.GetTiFlashStores() +func (c *RegionCache) GetTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, err error) { + c.tiflashMPPStoreMu.RLock() + needReload := c.tiflashMPPStoreMu.needReload + stores := c.tiflashMPPStoreMu.stores + c.tiflashMPPStoreMu.RUnlock() + + if needReload { + return c.ReloadTiFlashMPPStores(bo) + } + return stores, nil +} + +func (c *RegionCache) ReloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, _ error) { + // todo: thread safe? + stores, err := c.pdClient.GetAllStores(bo.GetCtx()) + if err != nil { + return nil, err + } mppLabels := []*metapb.StoreLabel{ { Key: "engine", Value: "tiflash_mpp", }, } - for _, store := range allStores { - if store.IsLabelsMatch(mppLabels) { - res = append(res, store) + for _, s := range stores { + tmpStore := &Store{ + storeID: s.GetId(), + addr: s.GetAddress(), + saddr: s.GetStatusAddress(), + storeType: tikvrpc.GetStoreTypeByMeta(s), + labels: s.GetLabels(), + // todo: ok? + state: uint64(resolved), + } + if tmpStore.IsLabelsMatch(mppLabels) { + res = append(res, tmpStore) } } - return res + + c.tiflashMPPStoreMu.Lock() + c.tiflashMPPStoreMu.stores = res + c.tiflashMPPStoreMu.Unlock() + return res, nil +} + +func (c *RegionCache) InvalidateTiFlashMPPStores() { + c.tiflashMPPStoreMu.Lock() + defer c.tiflashMPPStoreMu.Unlock() + c.tiflashMPPStoreMu.needReload = true + return } // UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if From 490f6614f7194ae523576ff1aecc827e2a696787 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 16 May 2022 11:05:09 +0800 Subject: [PATCH 03/14] trivial fix Signed-off-by: guo-shaoge --- internal/locate/region_cache.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 87d515c008..5e68f95e02 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -1695,7 +1695,7 @@ func (c *RegionCache) GetTiFlashStores() []*Store { return stores } -// GetTiFlashMPPStores returns all stores with lable +// GetTiFlashMPPStores returns all stores with lable . func (c *RegionCache) GetTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, err error) { c.tiflashMPPStoreMu.RLock() needReload := c.tiflashMPPStoreMu.needReload @@ -1703,13 +1703,12 @@ func (c *RegionCache) GetTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, er c.tiflashMPPStoreMu.RUnlock() if needReload { - return c.ReloadTiFlashMPPStores(bo) + return c.reloadTiFlashMPPStores(bo) } return stores, nil } -func (c *RegionCache) ReloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, _ error) { - // todo: thread safe? +func (c *RegionCache) reloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, _ error) { stores, err := c.pdClient.GetAllStores(bo.GetCtx()) if err != nil { return nil, err @@ -1727,7 +1726,6 @@ func (c *RegionCache) ReloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, saddr: s.GetStatusAddress(), storeType: tikvrpc.GetStoreTypeByMeta(s), labels: s.GetLabels(), - // todo: ok? state: uint64(resolved), } if tmpStore.IsLabelsMatch(mppLabels) { @@ -1741,6 +1739,8 @@ func (c *RegionCache) ReloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, return res, nil } +// InvalidateTiFlashMPPStores set needReload be true, +// and will refresh tiflash_mpp store cache next time. func (c *RegionCache) InvalidateTiFlashMPPStores() { c.tiflashMPPStoreMu.Lock() defer c.tiflashMPPStoreMu.Unlock() From 7bf5c474f9fc9b8f74f0112755a2f25728db1846 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 16 May 2022 21:02:08 +0800 Subject: [PATCH 04/14] add GetTiFlashMPPRPCContextByConsistentHash Signed-off-by: guo-shaoge --- internal/client/client.go | 3 +- internal/locate/region_cache.go | 98 +++++++++++++++++++------- internal/locate/region_request.go | 17 ++++- internal/mockstore/mocktikv/session.go | 6 +- tikv/gc.go | 2 +- tikvrpc/endpoint.go | 18 ++++- 6 files changed, 108 insertions(+), 36 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 78b6e0562a..7488f89169 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -380,7 +380,8 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R } // TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request. - enableBatch := req.StoreTp != tikvrpc.TiDB && req.StoreTp != tikvrpc.TiFlash + // tiflash/tiflash_mpp/tidb don't use BatchCommand. + enableBatch := req.StoreTp == tikvrpc.TiKV connArray, err := c.getConnArray(addr, enableBatch) if err != nil { return nil, err diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 5e68f95e02..93be2a7fb0 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -45,6 +45,7 @@ import ( "sync/atomic" "time" "unsafe" + "strconv" "github.com/gogo/protobuf/proto" "github.com/google/btree" @@ -69,6 +70,7 @@ import ( "google.golang.org/grpc/credentials" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" + "github.com/stathat/consistent" ) const ( @@ -394,7 +396,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.mu.sorted = btree.New(btreeDegree) c.storeMu.stores = make(map[uint64]*Store) c.tiflashMPPStoreMu.needReload = true - c.tiflashMPPStoreMu.stores = make([]*Store, 1) + c.tiflashMPPStoreMu.stores = make([]*Store, 0) c.notifyCheckCh = make(chan struct{}, 1) c.closeCh = make(chan struct{}) interval := config.GetGlobalConfig().StoresRefreshInterval @@ -733,6 +735,56 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, return nil, nil } +// GetTiFlashMPPRPCContextByConsistentHash return rpcCtx of tiflash_mpp stores. +// Each mpp computation of specific region will be handled by specific tiflash_mpp node. +// 1. Get all stores with label . +// 2. Get rpcCtx that indicates where the region is stored. +// 3. Compute which tiflash_mpp node should handle this region by consistent hash. +// 4. Replace infos(addr/Store) that indicate where the region is stored to infos that indicate where the region will be computed. +func (c *RegionCache) GetTiFlashMPPRPCContextByConsistentHash(bo *retry.Backoffer, ids []RegionVerID) (res []*RPCContext, err error) { + mppStores, err := c.GetTiFlashMPPStores(bo) + if err != nil { + return nil, err + } + if len(mppStores) == 0 { + return nil, errors.New("Number of tiflash_mpp node is zero") + } + + hasher := consistent.New() + mppStoreMap := make(map[string]*Store) + for _, store := range mppStores { + if _, ok := mppStoreMap[store.GetAddr()]; ok { + return nil, errors.New(fmt.Sprintf("unexpected duplicated tiflash_mpp store: %v", store.GetAddr())) + } else { + mppStoreMap[store.GetAddr()] = store + hasher.Add(store.GetAddr()) + } + } + + for _, id := range ids { + addr, err := hasher.Get(strconv.Itoa(int(id.GetID()))) + if err != nil { + return nil, err + } + rpcCtx, err := c.GetTiFlashRPCContext(bo, id, true) + if err != nil { + return nil, err + } + if rpcCtx == nil { + return nil, nil + } + if store, ok := mppStoreMap[addr]; !ok { + return nil, errors.New("unexpected missing store") + } else { + rpcCtx.Store = store + } + rpcCtx.Addr = addr + // Maybe no need to replace rpcCtx.AccessMode, it's only used for loadBalance when access storeIdx. + res = append(res, rpcCtx) + } + return res, nil +} + // KeyLocation is the region and range that a key is located. type KeyLocation struct { Region RegionVerID @@ -1713,23 +1765,16 @@ func (c *RegionCache) reloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, if err != nil { return nil, err } - mppLabels := []*metapb.StoreLabel{ - { - Key: "engine", - Value: "tiflash_mpp", - }, - } for _, s := range stores { - tmpStore := &Store{ - storeID: s.GetId(), - addr: s.GetAddress(), - saddr: s.GetStatusAddress(), - storeType: tikvrpc.GetStoreTypeByMeta(s), - labels: s.GetLabels(), - state: uint64(resolved), - } - if tmpStore.IsLabelsMatch(mppLabels) { - res = append(res, tmpStore) + if isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashMPP) { + res = append(res, &Store{ + storeID: s.GetId(), + addr: s.GetAddress(), + saddr: s.GetStatusAddress(), + storeType: tikvrpc.GetStoreTypeByMeta(s), + labels: s.GetLabels(), + state: uint64(resolved), + }) } } @@ -2220,20 +2265,23 @@ func (s *Store) IsLabelsMatch(labels []*metapb.StoreLabel) bool { return true } for _, targetLabel := range labels { - match := false - for _, label := range s.labels { - if targetLabel.Key == label.Key && targetLabel.Value == label.Value { - match = true - break - } - } - if !match { + if !isStoreContainLabel(s.labels, targetLabel.Key, targetLabel.Value) { return false } } return true } +func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (res bool) { + for _, label := range labels { + if label.GetKey() == key && label.GetValue() == val { + res = true + break + } + } + return res +} + type livenessState uint32 var ( diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index f29f8d7ac2..e96251808c 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -862,6 +862,15 @@ func (s *RegionRequestSender) getRPCContext( return s.regionCache.GetTiFlashRPCContext(bo, regionID, true) case tikvrpc.TiDB: return &RPCContext{Addr: s.storeAddr}, nil + case tikvrpc.TiFlashMPP: + rpcCtxs, err := s.regionCache.GetTiFlashMPPRPCContextByConsistentHash(bo, []RegionVerID{regionID}) + if err != nil { + return nil, err + } + if len(rpcCtxs) != 1 { + return nil, errors.New(fmt.Sprintf("unexpected number of rpcCtx, expect 1, got: %v", len(rpcCtxs))) + } + return rpcCtxs[0], nil default: return nil, errors.Errorf("unsupported storage type: %v", et) } @@ -1253,7 +1262,9 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e } } - if ctx.Meta != nil { + if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashMPP { + s.regionCache.InvalidateTiFlashMPPStores() + } else if ctx.Meta != nil { if s.replicaSelector != nil { s.replicaSelector.onSendFailure(bo, err) } else { @@ -1265,7 +1276,7 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e // When a store is not available, the leader of related region should be elected quickly. // TODO: the number of retry time should be limited:since region may be unavailable // when some unrecoverable disaster happened. - if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash { + if ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() { err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx)) } else { err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) @@ -1404,7 +1415,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later", zap.String("reason", regionErr.GetServerIsBusy().GetReason()), zap.Stringer("ctx", ctx)) - if ctx != nil && ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash { + if ctx != nil && ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() { err = bo.Backoff(retry.BoTiFlashServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) } else { err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) diff --git a/internal/mockstore/mocktikv/session.go b/internal/mockstore/mocktikv/session.go index f29b8823ee..f03084e739 100644 --- a/internal/mockstore/mocktikv/session.go +++ b/internal/mockstore/mocktikv/session.go @@ -131,7 +131,7 @@ func (s *Session) CheckRequestContext(ctx *kvrpcpb.Context) *errorpb.Error { } } // The Peer on the Store is not leader. If it's tiflash store , we pass this check. - if storePeer.GetId() != leaderPeer.GetId() && !isTiFlashStore(s.cluster.GetStore(storePeer.GetStoreId())) { + if storePeer.GetId() != leaderPeer.GetId() && !isTiFlashRelatedStore(s.cluster.GetStore(storePeer.GetStoreId())) { return &errorpb.Error{ Message: *proto.String("not leader"), NotLeader: &errorpb.NotLeader{ @@ -182,9 +182,9 @@ func (s *Session) checkKeyInRegion(key []byte) bool { return regionContains(s.startKey, s.endKey, NewMvccKey(key)) } -func isTiFlashStore(store *metapb.Store) bool { +func isTiFlashRelatedStore(store *metapb.Store) bool { for _, l := range store.GetLabels() { - if l.GetKey() == "engine" && l.GetValue() == "tiflash" { + if l.GetKey() == "engine" && (l.GetValue() == "tiflash" || l.GetValue() == "tiflash_mpp") { return true } } diff --git a/tikv/gc.go b/tikv/gc.go index 7d7f0598a6..e6111b2354 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -275,7 +275,7 @@ func (s *KVStore) listStoresForUnsafeDestory(ctx context.Context) ([]*metapb.Sto if store.State == metapb.StoreState_Tombstone { continue } - if tikvrpc.GetStoreTypeByMeta(store) == tikvrpc.TiFlash { + if tikvrpc.GetStoreTypeByMeta(store).IsTiFlashRelatedType() { continue } upStores = append(upStores, store) diff --git a/tikvrpc/endpoint.go b/tikvrpc/endpoint.go index 98f45ccd1e..b7d44c7fdb 100644 --- a/tikvrpc/endpoint.go +++ b/tikvrpc/endpoint.go @@ -44,6 +44,7 @@ const ( TiKV EndpointType = iota TiFlash TiDB + TiFlashMPP ) // Name returns the name of endpoint type. @@ -55,23 +56,34 @@ func (t EndpointType) Name() string { return "tiflash" case TiDB: return "tidb" + case TiFlashMPP: + return "tiflash_mpp" } return "unspecified" } +// IsTiFlashRelatedType return true if it's tiflash or tiflash_mpp. +func (t EndpointType) IsTiFlashRelatedType() bool { + return t == TiFlash || t == TiFlashMPP +} + // Constants to determine engine type. // They should be synced with PD. const ( - engineLabelKey = "engine" - engineLabelTiFlash = "tiflash" + EngineLabelKey = "engine" + EngineLabelTiFlash = "tiflash" + EngineLabelTiFlashMPP = "tiflash_mpp" ) // GetStoreTypeByMeta gets store type by store meta pb. func GetStoreTypeByMeta(store *metapb.Store) EndpointType { for _, label := range store.Labels { - if label.Key == engineLabelKey && label.Value == engineLabelTiFlash { + if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlash { return TiFlash } + if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlashMPP { + return TiFlashMPP + } } return TiKV } From 5803c02ac8e22ef363e8b1f4bda79a6900c03c04 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 19 May 2022 13:25:49 +0800 Subject: [PATCH 05/14] add InvalidateTiFlashMPPStoresIfGRPCError() Signed-off-by: guo-shaoge --- internal/locate/region_cache.go | 28 +++++++++++++++++++++++++--- internal/locate/region_request.go | 2 +- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 436f8ea9cd..cd3c1c3703 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -40,18 +40,19 @@ import ( "fmt" "math/rand" "sort" + "strconv" "strings" "sync" "sync/atomic" "time" "unsafe" - "strconv" "github.com/gogo/protobuf/proto" "github.com/google/btree" "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" + "github.com/stathat/consistent" "github.com/tikv/client-go/v2/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/client" @@ -67,11 +68,12 @@ import ( "golang.org/x/sync/singleflight" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" - "github.com/stathat/consistent" + "google.golang.org/grpc/status" ) const ( @@ -742,6 +744,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, // 2. Get rpcCtx that indicates where the region is stored. // 3. Compute which tiflash_mpp node should handle this region by consistent hash. // 4. Replace infos(addr/Store) that indicate where the region is stored to infos that indicate where the region will be computed. +// NOTE: This function make sure the returned slice of RPCContext and the input ids correspond to each other. func (c *RegionCache) GetTiFlashMPPRPCContextByConsistentHash(bo *retry.Backoffer, ids []RegionVerID) (res []*RPCContext, err error) { mppStores, err := c.GetTiFlashMPPStores(bo) if err != nil { @@ -1808,7 +1811,7 @@ func (c *RegionCache) reloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, saddr: s.GetStatusAddress(), storeType: tikvrpc.GetStoreTypeByMeta(s), labels: s.GetLabels(), - state: uint64(resolved), + state: uint64(resolved), }) } } @@ -1819,6 +1822,25 @@ func (c *RegionCache) reloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, return res, nil } +// InvalidateTiFlashMPPStoresIfGRPCError will invalid cache if is GRPC error. +// For now, only consider GRPC unavailable error. +func (c *RegionCache) InvalidateTiFlashMPPStoresIfGRPCError(err error) bool { + var invalidate bool + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.Unavailable: + invalidate = true + default: + } + } + if !invalidate { + return false + } + + c.InvalidateTiFlashMPPStores() + return true +} + // InvalidateTiFlashMPPStores set needReload be true, // and will refresh tiflash_mpp store cache next time. func (c *RegionCache) InvalidateTiFlashMPPStores() { diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c4e7ae2477..46fa89b087 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1281,7 +1281,7 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e } if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashMPP { - s.regionCache.InvalidateTiFlashMPPStores() + s.regionCache.InvalidateTiFlashMPPStoresIfGRPCError(err) } else if ctx.Meta != nil { if s.replicaSelector != nil { s.replicaSelector.onSendFailure(bo, err) From cfb0cecb358b74a15ef75fb86435bbedf3fc5fab Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 20 May 2022 14:50:14 +0800 Subject: [PATCH 06/14] fix go.mod Signed-off-by: guo-shaoge --- go.mod | 1 + go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/go.mod b/go.mod index dc543960d3..926887fa9f 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/prometheus/client_golang v1.11.0 github.com/prometheus/client_model v0.2.0 github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect + github.com/stathat/consistent v1.0.0 // indirect github.com/stretchr/testify v1.7.0 github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index 6db1764ee1..3e05807433 100644 --- a/go.sum +++ b/go.sum @@ -199,6 +199,8 @@ github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= +github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= From 3dbbc1f54d41fdd5945a1171efb306ffadfcd6dd Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 20 May 2022 15:42:16 +0800 Subject: [PATCH 07/14] fix lint Signed-off-by: guo-shaoge --- internal/locate/region_cache.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 3443283e5e..fa8c38fe10 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -756,14 +756,8 @@ func (c *RegionCache) GetTiFlashMPPRPCContextByConsistentHash(bo *retry.Backoffe } hasher := consistent.New() - mppStoreMap := make(map[string]*Store) for _, store := range mppStores { - if _, ok := mppStoreMap[store.GetAddr()]; ok { - return nil, errors.New(fmt.Sprintf("unexpected duplicated tiflash_mpp store: %v", store.GetAddr())) - } else { - mppStoreMap[store.GetAddr()] = store - hasher.Add(store.GetAddr()) - } + hasher.Add(store.GetAddr()) } for _, id := range ids { @@ -778,11 +772,19 @@ func (c *RegionCache) GetTiFlashMPPRPCContextByConsistentHash(bo *retry.Backoffe if rpcCtx == nil { return nil, nil } - if store, ok := mppStoreMap[addr]; !ok { - return nil, errors.New("unexpected missing store") - } else { - rpcCtx.Store = store + + var store *Store + for _, s := range mppStores { + if s.GetAddr() == addr { + store = s + break + } + } + if store == nil { + return nil, errors.New(fmt.Sprintf("cannot find mpp store: %v", addr)) } + + rpcCtx.Store = store rpcCtx.Addr = addr // Maybe no need to replace rpcCtx.AccessMode, it's only used for loadBalance when access storeIdx. res = append(res, rpcCtx) @@ -1867,7 +1869,6 @@ func (c *RegionCache) InvalidateTiFlashMPPStores() { c.tiflashMPPStoreMu.Lock() defer c.tiflashMPPStoreMu.Unlock() c.tiflashMPPStoreMu.needReload = true - return } // UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if From d58a648505648d4c830ade14c995812be9c1dfc1 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 20 May 2022 17:16:32 +0800 Subject: [PATCH 08/14] empty commit to trigger ci Signed-off-by: guo-shaoge From 149aaa02fc27101205677d745bc4b6e4d055dc4b Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 22 May 2022 22:39:27 +0800 Subject: [PATCH 09/14] fix when rpcCtx is nil Signed-off-by: guo-shaoge --- internal/locate/region_request.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 46fa89b087..32c81f03ae 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -867,7 +867,9 @@ func (s *RegionRequestSender) getRPCContext( if err != nil { return nil, err } - if len(rpcCtxs) != 1 { + if rpcCtxs == nil { + return nil, nil + } else if len(rpcCtxs) != 1 { return nil, errors.New(fmt.Sprintf("unexpected number of rpcCtx, expect 1, got: %v", len(rpcCtxs))) } return rpcCtxs[0], nil From ffb8afbafaf4baa6e219f0d4b1e5ec557f1d66cf Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 23 May 2022 10:59:07 +0800 Subject: [PATCH 10/14] empty Signed-off-by: Yilin Chen From a2554537151d4ab207f496d347229cec7a38ed90 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 23 May 2022 11:02:20 +0800 Subject: [PATCH 11/14] fix go.mod Signed-off-by: guo-shaoge --- go.mod | 3 ++- go.sum | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 5695a74a55..79f79e2330 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/prometheus/client_golang v1.11.0 github.com/prometheus/client_model v0.2.0 github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect - github.com/stathat/consistent v1.0.0 // indirect + github.com/stathat/consistent v1.0.0 github.com/stretchr/testify v1.7.0 github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 github.com/twmb/murmur3 v1.1.3 @@ -33,4 +33,5 @@ require ( go.uber.org/zap v1.20.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/grpc v1.43.0 + stathat.com/c/consistent v1.0.0 // indirect ) diff --git a/go.sum b/go.sum index 3160e26d93..f95c1904db 100644 --- a/go.sum +++ b/go.sum @@ -412,3 +412,5 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= From 09bc0a16b5b7fce106bb6a930f40f85bb53153a1 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 23 May 2022 11:03:26 +0800 Subject: [PATCH 12/14] empty Signed-off-by: guo-shaoge From 8eed354a7ada997f20772ccfe2f45f6ecf37f9cc Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 23 May 2022 11:05:45 +0800 Subject: [PATCH 13/14] trigger test Signed-off-by: Yilin Chen From 110e3f9437ce53840a013bb1cbcea9e50800bddc Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 23 May 2022 11:11:33 +0800 Subject: [PATCH 14/14] fix go.mod Signed-off-by: guo-shaoge --- integration_tests/go.sum | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 9a26e82a7f..3117ec94ef 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -703,6 +703,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= +github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -1400,3 +1402,5 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 h1:ucqkfpjg9WzSUubAO62csmucvxl4/JeW3F4I4909XkM= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:L5q+DGLGOQFpo1snNEkLOJT2d1YTW66rWNzatr3He1k= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=