Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func newCheckSumCommand() *cobra.Command {
zap.Uint64("totalBytes", file.GetTotalBytes()),
zap.Uint64("startVersion", file.GetStartVersion()),
zap.Uint64("endVersion", file.GetEndVersion()),
zap.Binary("startKey", file.GetStartKey()),
zap.Binary("endKey", file.GetEndKey()),
zap.Stringer("startKey", utils.WrapKey(file.GetStartKey())),
zap.Stringer("endKey", utils.WrapKey(file.GetEndKey())),
)

var data []byte
Expand Down Expand Up @@ -179,7 +179,7 @@ func newBackupMetaCommand() *cobra.Command {
log.Error(
"file ranges overlapped",
zap.Stringer("out", out),
zap.Stringer("file", file),
utils.ZapFile(file),
)
}
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ func (bc *Client) BackupRange(
}
}()
log.Info("backup started",
zap.Binary("StartKey", startKey),
zap.Binary("EndKey", endKey),
zap.Stringer("StartKey", utils.WrapKey(startKey)),
zap.Stringer("EndKey", utils.WrapKey(endKey)),
zap.Uint64("RateLimit", req.RateLimit),
zap.Uint32("Concurrency", req.Concurrency))
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -494,8 +494,8 @@ func (bc *Client) BackupRange(
bc.backupMeta.RawRanges = append(bc.backupMeta.RawRanges,
&kvproto.RawRange{StartKey: startKey, EndKey: endKey, Cf: req.Cf})
log.Info("backup raw ranges",
zap.ByteString("startKey", startKey),
zap.ByteString("endKey", endKey),
zap.Stringer("startKey", utils.WrapKey(startKey)),
zap.Stringer("endKey", utils.WrapKey(endKey)),
zap.String("cf", req.Cf))
} else {
log.Info("backup time range",
Expand Down Expand Up @@ -531,10 +531,10 @@ func (bc *Client) findRegionLeader(
}
if region.Leader != nil {
log.Info("find leader",
zap.Reflect("Leader", region.Leader), zap.Binary("Key", key))
zap.Reflect("Leader", region.Leader), zap.Stringer("Key", utils.WrapKey(key)))
return region.Leader, nil
}
log.Warn("no region found", zap.Binary("Key", key))
log.Warn("no region found", zap.Stringer("Key", utils.WrapKey(key)))
time.Sleep(time.Millisecond * time.Duration(100*i))
continue
}
Expand Down Expand Up @@ -618,8 +618,8 @@ func (bc *Client) fineGrainedBackup(
zap.Reflect("error", resp.Error))
}
log.Info("put fine grained range",
zap.Binary("StartKey", resp.StartKey),
zap.Binary("EndKey", resp.EndKey),
zap.Stringer("StartKey", utils.WrapKey(resp.StartKey)),
zap.Stringer("EndKey", utils.WrapKey(resp.EndKey)),
)
rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files)

Expand Down Expand Up @@ -770,8 +770,8 @@ func SendBackup(
respFn func(*kvproto.BackupResponse) error,
) error {
log.Info("try backup",
zap.Binary("StartKey", req.StartKey),
zap.Binary("EndKey", req.EndKey),
zap.Stringer("StartKey", utils.WrapKey(req.StartKey)),
zap.Stringer("EndKey", utils.WrapKey(req.EndKey)),
zap.Uint64("storeID", storeID),
)
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -793,8 +793,8 @@ func SendBackup(
}
// TODO: handle errors in the resp.
log.Info("range backuped",
zap.Any("StartKey", resp.GetStartKey()),
zap.Any("EndKey", resp.GetEndKey()))
zap.Stringer("StartKey", utils.WrapKey(resp.GetStartKey())),
zap.Stringer("EndKey", utils.WrapKey(req.GetEndKey())))
err = respFn(resp)
if err != nil {
return err
Expand Down
38 changes: 19 additions & 19 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (importer *FileImporter) Import(
rejectStoreMap map[uint64]bool,
rewriteRules *RewriteRules,
) error {
log.Debug("import file", zap.Stringer("file", file))
log.Debug("import file", utils.ZapFile(file))
// Rewrite the start key and end key of file to scan regions
var startKey, endKey []byte
var err error
Expand All @@ -203,9 +203,9 @@ func (importer *FileImporter) Import(
return err
}
log.Debug("rewrite file keys",
zap.Stringer("file", file),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey))
utils.ZapFile(file),
zap.Stringer("startKey", utils.WrapKey(startKey)),
zap.Stringer("endKey", utils.WrapKey(endKey)))

needReject := len(rejectStoreMap) > 0

Expand All @@ -226,7 +226,7 @@ func (importer *FileImporter) Import(
for _, region := range regionInfos {
if !waitForRemoveRejectStores(ctx, importer.metaClient, region, rejectStoreMap) {
log.Error("waiting for removing rejected stores failed",
zap.Stringer("region", region.Region))
utils.ZapRegion(region.Region))
return errors.New("waiting for removing rejected stores failed")
}
}
Expand All @@ -235,7 +235,7 @@ func (importer *FileImporter) Import(
needReject = false
}

log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
log.Debug("scan regions", utils.ZapFile(file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
regionLoop:
for _, regionInfo := range regionInfos {
Expand Down Expand Up @@ -266,10 +266,10 @@ func (importer *FileImporter) Import(
}
}
log.Error("download file failed",
zap.Stringer("file", file),
zap.Stringer("region", info.Region),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey),
utils.ZapFile(file),
utils.ZapRegion(info.Region),
zap.Stringer("startKey", utils.WrapKey(startKey)),
zap.Stringer("endKey", utils.WrapKey(endKey)),
zap.Error(errDownload))
return errDownload
}
Expand Down Expand Up @@ -300,7 +300,7 @@ func (importer *FileImporter) Import(
}
}
log.Debug("ingest sst returns not leader error, retry it",
zap.Stringer("region", info.Region),
utils.ZapRegion(info.Region),
zap.Stringer("newLeader", newInfo.Leader))

if !checkRegionEpoch(newInfo, info) {
Expand All @@ -326,9 +326,9 @@ func (importer *FileImporter) Import(

if errIngest != nil {
log.Error("ingest file failed",
zap.Stringer("file", file),
utils.ZapFile(file),
zap.Stringer("range", downloadMeta.GetRange()),
zap.Stringer("region", info.Region),
utils.ZapRegion(info.Region),
zap.Error(errIngest))
return errIngest
}
Expand Down Expand Up @@ -379,9 +379,9 @@ func (importer *FileImporter) downloadSST(
RewriteRule: rule,
}
log.Debug("download SST",
zap.Stringer("sstMeta", &sstMeta),
zap.Stringer("file", file),
zap.Stringer("region", regionInfo.Region),
utils.ZapSSTMeta(&sstMeta),
utils.ZapFile(file),
utils.ZapRegion(regionInfo.Region),
)
var resp *import_sstpb.DownloadResponse
for _, peer := range regionInfo.Region.GetPeers() {
Expand Down Expand Up @@ -432,8 +432,8 @@ func (importer *FileImporter) downloadRawKVSST(
RewriteRule: rule,
}
log.Debug("download SST",
zap.Stringer("sstMeta", &sstMeta),
zap.Stringer("region", regionInfo.Region),
utils.ZapSSTMeta(&sstMeta),
utils.ZapRegion(regionInfo.Region),
)
var resp *import_sstpb.DownloadResponse
for _, peer := range regionInfo.Region.GetPeers() {
Expand Down Expand Up @@ -470,7 +470,7 @@ func (importer *FileImporter) ingestSST(
Context: reqCtx,
Sst: sstMeta,
}
log.Debug("ingest SST", zap.Stringer("sstMeta", sstMeta), zap.Reflect("leader", leader))
log.Debug("ingest SST", utils.ZapSSTMeta(sstMeta), zap.Reflect("leader", leader))
resp, err := importer.importClient.IngestSST(importer.ctx, leader.GetStoreId(), req)
if err != nil {
return nil, errors.Trace(err)
Expand Down
17 changes: 9 additions & 8 deletions pkg/restore/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/utils"
)

// SortRanges checks if the range overlapped and sort them.
Expand All @@ -26,26 +27,26 @@ func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range
if startID == endID {
rg.StartKey, rule = replacePrefix(rg.StartKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", zap.Binary("key", rg.StartKey))
log.Warn("cannot find rewrite rule", zap.Stringer("key", utils.WrapKey(rg.StartKey)))
} else {
log.Debug(
"rewrite start key",
zap.Binary("key", rg.StartKey),
zap.Stringer("rule", rule))
zap.Stringer("key", utils.WrapKey(rg.StartKey)),
utils.ZapRewriteRule(rule))
}
rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", zap.Binary("key", rg.EndKey))
log.Warn("cannot find rewrite rule", zap.Stringer("key", utils.WrapKey(rg.EndKey)))
} else {
log.Debug(
"rewrite end key",
zap.Binary("key", rg.EndKey),
zap.Stringer("rule", rule))
zap.Stringer("key", utils.WrapKey(rg.EndKey)),
utils.ZapRewriteRule(rule))
}
} else {
log.Warn("table id does not match",
zap.Binary("startKey", rg.StartKey),
zap.Binary("endKey", rg.EndKey),
zap.Stringer("startKey", utils.WrapKey(rg.StartKey)),
zap.Stringer("endKey", utils.WrapKey(rg.EndKey)),
zap.Int64("startID", startID),
zap.Int64("endID", endID))
return nil, errors.New("table id does not match")
Expand Down
21 changes: 11 additions & 10 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/utils"
)

// Constants for split retry machinery.
Expand Down Expand Up @@ -117,9 +118,9 @@ SplitRegions:
if strings.Contains(errSplit.Error(), "no valid key") {
for _, key := range keys {
log.Error("no valid key",
zap.Binary("startKey", region.Region.StartKey),
zap.Binary("endKey", region.Region.EndKey),
zap.Binary("key", codec.EncodeBytes([]byte{}, key)))
zap.Stringer("startKey", utils.WrapKey(region.Region.StartKey)),
zap.Stringer("endKey", utils.WrapKey(region.Region.EndKey)),
zap.Stringer("key", utils.WrapKey(codec.EncodeBytes([]byte{}, key))))
}
return errors.Trace(errSplit)
}
Expand All @@ -129,11 +130,11 @@ SplitRegions:
}
time.Sleep(interval)
if i > 3 {
log.Warn("splitting regions failed, retry it", zap.Error(errSplit), zap.ByteStrings("keys", keys))
log.Warn("splitting regions failed, retry it", zap.Error(errSplit), zap.Array("keys", utils.WrapKeys(keys)))
}
continue SplitRegions
}
log.Debug("split regions", zap.Stringer("region", region.Region), zap.ByteStrings("keys", keys))
log.Debug("split regions", utils.ZapRegion(region.Region), zap.Array("keys", utils.WrapKeys(keys)))
scatterRegions = append(scatterRegions, newRegions...)
onSplit(keys)
}
Expand Down Expand Up @@ -226,7 +227,7 @@ func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *
ok, err := rs.isScatterRegionFinished(ctx1, regionID)
if err != nil {
log.Warn("scatter region failed: do not have the region",
zap.Stringer("region", regionInfo.Region))
utils.ZapRegion(regionInfo.Region))
return
}
if ok {
Expand All @@ -251,7 +252,7 @@ func (rs *RegionSplitter) splitAndScatterRegions(
// Wait for a while until the regions successfully splits.
rs.waitForSplit(ctx, region.Region.Id)
if err = rs.client.ScatterRegion(ctx, region); err != nil {
log.Warn("scatter region failed", zap.Stringer("region", region.Region), zap.Error(err))
log.Warn("scatter region failed", utils.ZapRegion(region.Region), zap.Error(err))
}
}
return newRegions, nil
Expand Down Expand Up @@ -279,9 +280,9 @@ func getSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*R
}
splitKeyMap[region.Region.GetId()] = append(splitKeys, key)
log.Debug("get key for split region",
zap.Binary("key", key),
zap.Binary("startKey", region.Region.StartKey),
zap.Binary("endKey", region.Region.EndKey))
zap.Stringer("key", utils.WrapKey(key)),
zap.Stringer("startKey", utils.WrapKey(region.Region.StartKey)),
zap.Stringer("endKey", utils.WrapKey(region.Region.EndKey)))
}
}
return splitKeyMap
Expand Down
Loading