Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func newTableRestoreCommand() *cobra.Command {
func newTiflashReplicaRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "tiflash-replica",
Short: "restore the tiflash replica before the last restore, it must only be used after the last restore failed",
Short: "restore the tiflash replica removed by a failed restore of the older version BR",
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreTiflashReplicaCommand(cmd, "Restore TiFlash Replica")
},
Expand Down
6 changes: 2 additions & 4 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,6 @@ func (rc *Client) setSpeedLimit() error {
func (rc *Client) RestoreFiles(
files []*backup.File,
rewriteRules *RewriteRules,
rejectStoreMap map[uint64]bool,
updateCh glue.Progress,
) (err error) {
start := time.Now()
Expand All @@ -649,7 +648,7 @@ func (rc *Client) RestoreFiles(
rc.workerPool.ApplyOnErrorGroup(eg,
func() error {
defer updateCh.Inc()
return rc.fileImporter.Import(ectx, fileReplica, rejectStoreMap, rewriteRules)
return rc.fileImporter.Import(ectx, fileReplica, rewriteRules)
})
}
if err := eg.Wait(); err != nil {
Expand Down Expand Up @@ -682,13 +681,12 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil
return errors.Trace(err)
}

emptyRules := &RewriteRules{}
for _, file := range files {
fileReplica := file
rc.workerPool.ApplyOnErrorGroup(eg,
func() error {
defer updateCh.Inc()
return rc.fileImporter.Import(ectx, fileReplica, nil, emptyRules)
return rc.fileImporter.Import(ectx, fileReplica, EmptyRewriteRule())
})
}
if err := eg.Wait(); err != nil {
Expand Down
19 changes: 0 additions & 19 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error {
func (importer *FileImporter) Import(
ctx context.Context,
file *backup.File,
rejectStoreMap map[uint64]bool,
rewriteRules *RewriteRules,
) error {
log.Debug("import file", utils.ZapFile(file))
Expand All @@ -201,8 +200,6 @@ func (importer *FileImporter) Import(
zap.Stringer("startKey", utils.WrapKey(startKey)),
zap.Stringer("endKey", utils.WrapKey(endKey)))

needReject := len(rejectStoreMap) > 0

err = utils.WithRetry(ctx, func() error {
tctx, cancel := context.WithTimeout(ctx, importScanRegionTime)
defer cancel()
Expand All @@ -213,22 +210,6 @@ func (importer *FileImporter) Import(
return errors.Trace(errScanRegion)
}

if needReject {
// TODO remove when TiFlash support restore
startTime := time.Now()
log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStoreMap))
for _, region := range regionInfos {
if !waitForRemoveRejectStores(ctx, importer.metaClient, region, rejectStoreMap) {
log.Error("waiting for removing rejected stores failed",
utils.ZapRegion(region.Region))
return errors.New("waiting for removing rejected stores failed")
}
}
log.Info("waiting for removing rejected stores done",
zap.Int("regions", len(regionInfos)), zap.Duration("take", time.Since(startTime)))
needReject = false
}

log.Debug("scan regions", utils.ZapFile(file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
regionLoop:
Expand Down
27 changes: 5 additions & 22 deletions pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/pingcap/parser/model"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/utils"
Expand Down Expand Up @@ -135,35 +134,19 @@ type BatchSender interface {
}

type tikvSender struct {
client *Client
updateCh glue.Progress
rejectStoreMap map[uint64]bool
client *Client
updateCh glue.Progress
}

// NewTiKVSender make a sender that send restore requests to TiKV.
func NewTiKVSender(
ctx context.Context,
cli *Client,
updateCh glue.Progress,
// TODO remove this field after we support TiFlash.
removeTiFlash bool,
) (BatchSender, error) {
rejectStoreMap := make(map[uint64]bool)
if removeTiFlash {
tiflashStores, err := conn.GetAllTiKVStores(ctx, cli.GetPDClient(), conn.TiFlashOnly)
if err != nil {
log.Error("failed to get and remove TiFlash replicas", zap.Error(err))
return nil, err
}
for _, store := range tiflashStores {
rejectStoreMap[store.GetId()] = true
}
}

return &tikvSender{
client: cli,
updateCh: updateCh,
rejectStoreMap: rejectStoreMap,
client: cli,
updateCh: updateCh,
}, nil
}

Expand All @@ -181,7 +164,7 @@ func (b *tikvSender) RestoreBatch(ctx context.Context, ranges []rtree.Range, rew
files = append(files, fs.Files...)
}

if err := b.client.RestoreFiles(files, rewriteRules, b.rejectStoreMap, b.updateCh); err != nil {
if err := b.client.RestoreFiles(files, rewriteRules, b.updateCh); err != nil {
return err
}

Expand Down
56 changes: 0 additions & 56 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,62 +497,6 @@ func PaginateScanRegion(
return regions, nil
}

func hasRejectStorePeer(
ctx context.Context,
client SplitClient,
regionID uint64,
rejectStores map[uint64]bool,
) (bool, error) {
regionInfo, err := client.GetRegionByID(ctx, regionID)
if err != nil {
return false, err
}
if regionInfo == nil {
return false, nil
}
for _, peer := range regionInfo.Region.GetPeers() {
if rejectStores[peer.GetStoreId()] {
return true, nil
}
}
retryTimes := ctx.Value(retryTimes).(int)
if retryTimes > 10 {
log.Warn("get region info", utils.ZapRegion(regionInfo.Region))
}
return false, nil
}

func waitForRemoveRejectStores(
ctx context.Context,
client SplitClient,
regionInfo *RegionInfo,
rejectStores map[uint64]bool,
) bool {
interval := RejectStoreCheckInterval
regionID := regionInfo.Region.GetId()
for i := 0; i < RejectStoreCheckRetryTimes; i++ {
ctx1 := context.WithValue(ctx, retryTimes, i)
ok, err := hasRejectStorePeer(ctx1, client, regionID, rejectStores)
if err != nil {
log.Warn("wait for rejecting store failed",
utils.ZapRegion(regionInfo.Region),
zap.Error(err))
return false
}
// Do not have any peer in the rejected store, return true
if !ok {
return true
}
interval = 2 * interval
if interval > RejectStoreMaxCheckInterval {
interval = RejectStoreMaxCheckInterval
}
time.Sleep(interval)
}

return false
}

// ZapTables make zap field of table for debuging, including table names.
func ZapTables(tables []CreatedTable) zapcore.Field {
tableNames := make([]string, 0, len(tables))
Expand Down
3 changes: 1 addition & 2 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -176,7 +175,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
if err != nil {
return err
}
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash, cfg.CheckRequirements)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/storage"
Expand Down Expand Up @@ -126,7 +125,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
if err != nil {
return err
}
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash, cfg.CheckRequirements)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
25 changes: 11 additions & 14 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type Config struct {
Filter filter.MySQLReplicationRules

TableFilter filter.Filter `json:"-" toml:"-"`
RemoveTiFlash bool `json:"remove-tiflash" toml:"remove-tiflash"`
CheckRequirements bool `json:"check-requirements" toml:"check-requirements"`
SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"`
}
Expand All @@ -137,6 +136,8 @@ func DefineCommonFlags(flags *pflag.FlagSet) {

flags.Uint64(flagRateLimitUnit, utils.MB, "The unit of rate limit")
_ = flags.MarkHidden(flagRateLimitUnit)
_ = flags.MarkDeprecated(flagRemoveTiFlash,
"TiFlash is fully supported by BR now, removing TiFlash isn't needed any more. This flag would be ignored.")

flags.Bool(flagCheckRequirement, true,
"Whether start version check before execute command")
Expand Down Expand Up @@ -221,11 +222,6 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
}
cfg.RateLimit = rateLimit * rateLimitUnit

cfg.RemoveTiFlash, err = flags.GetBool(flagRemoveTiFlash)
if err != nil {
return errors.Trace(err)
}

var caseSensitive bool
if filterFlag := flags.Lookup(flagFilter); filterFlag != nil {
f, err := filter.Parse(filterFlag.Value.(pflag.SliceValue).GetSlice())
Expand Down Expand Up @@ -282,14 +278,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
}

// newMgr creates a new mgr at the given PD address.
func newMgr(
ctx context.Context,
g glue.Glue,
pds []string,
func newMgr(ctx context.Context,
g glue.Glue, pds []string,
tlsConfig TLSConfig,
storeBehavior conn.StoreBehavior,
checkRequirements bool,
) (*conn.Mgr, error) {
checkRequirements bool) (*conn.Mgr, error) {
var (
tlsConf *tls.Config
err error
Expand All @@ -315,7 +307,12 @@ func newMgr(
if err != nil {
return nil, err
}
return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage), tlsConf, securityOption, storeBehavior, checkRequirements)

// Is it necessary to remove `StoreBehavior`?
return conn.NewMgr(ctx, g,
pdAddress, store.(tikv.Storage),
tlsConf, securityOption,
conn.SkipTiFlash, checkRequirements)
}

// GetStorage gets the storage backend from the config.
Expand Down
36 changes: 4 additions & 32 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
cfg.RemoveTiFlash, err = flags.GetBool(flagRemoveTiFlash)
if err != nil {
return errors.Trace(err)
}
err = cfg.Config.ParseFromFlags(flags)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -97,7 +93,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash, cfg.CheckRequirements)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down Expand Up @@ -250,15 +246,15 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
int64(rangeSize+len(files)+len(tables)),
!cfg.LogProgress)
defer updateCh.Close()
sender, err := restore.NewTiKVSender(ctx, client, updateCh, cfg.RemoveTiFlash)
sender, err := restore.NewTiKVSender(ctx, client, updateCh)
if err != nil {
return err
}
manager := restore.NewBRContextManager(client)
batcher, afterRestoreStream := restore.NewBatcher(ctx, sender, manager, errCh)
batcher.SetThreshold(batchSize)
batcher.EnableAutoCommit(ctx, time.Second)
go restoreTableStream(ctx, rangeStream, cfg.RemoveTiFlash, cfg.PD, client, batcher, errCh)
go restoreTableStream(ctx, rangeStream, batcher, errCh)

var finish <-chan struct{}
// Checksum
Expand Down Expand Up @@ -372,7 +368,7 @@ func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash, cfg.CheckRequirements)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down Expand Up @@ -439,10 +435,6 @@ func enableTiDBConfig() {
func restoreTableStream(
ctx context.Context,
inputCh <-chan restore.TableWithRange,
// TODO: remove this field and rules field after we support TiFlash
removeTiFlashReplica bool,
pdAddr []string,
client *restore.Client,
batcher *restore.Batcher,
errCh chan<- error,
) {
Expand All @@ -454,10 +446,6 @@ func restoreTableStream(
log.Info("doing postwork",
zap.Int("table count", len(oldTables)),
)
if err := client.RecoverTiFlashReplica(oldTables); err != nil {
log.Error("failed on recover TiFlash replicas", zap.Error(err))
errCh <- err
}
}()

for {
Expand All @@ -469,22 +457,6 @@ func restoreTableStream(
if !ok {
return
}
if removeTiFlashReplica {
rules, err := client.GetPlacementRules(pdAddr)
if err != nil {
errCh <- err
return
}
log.Debug("get rules", zap.Any("rules", rules), zap.Strings("pd", pdAddr))
log.Debug("try to remove tiflash of table", zap.Stringer("table name", t.Table.Name))
tiFlashRep, err := client.RemoveTiFlashOfTable(t.CreatedTable, rules)
if err != nil {
log.Error("failed on remove TiFlash replicas", zap.Error(err))
errCh <- err
return
}
t.OldTable.TiFlashReplicas = tiFlashRep
}
oldTables = append(oldTables, t.OldTable)

batcher.Add(t)
Expand Down
Loading