Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ build_for_integration_test:
-o bin/br.test
# build key locker
GO111MODULE=on go build -race -o bin/locker tests/br_key_locked/*.go
# build gc
GO111MODULE=on go build -race -o bin/gc tests/br_z_gc_safepoint/*.go

test:
GO111MODULE=on go test -race -tags leak ./...
Expand Down
21 changes: 16 additions & 5 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
)

const (
flagBackupTimeago = "timeago"
flagBackupRateLimit = "ratelimit"
flagBackupConcurrency = "concurrency"
flagBackupChecksum = "checksum"
flagLastBackupTS = "lastbackupts"
flagBackupTimeago = "timeago"
flagBackupRateLimit = "ratelimit"
flagBackupRateLimitUnit = "ratelimit-unit"
flagBackupConcurrency = "concurrency"
flagBackupChecksum = "checksum"
flagLastBackupTS = "lastbackupts"
)

func defineBackupFlags(flagSet *pflag.FlagSet) {
Expand All @@ -36,6 +37,11 @@ func defineBackupFlags(flagSet *pflag.FlagSet) {
"Run checksum after backup")
flagSet.Uint64P(flagLastBackupTS, "", 0, "the last time backup ts")
_ = flagSet.MarkHidden(flagLastBackupTS)

// Test only flag.
flagSet.Uint64P(
flagBackupRateLimitUnit, "", utils.MB, "The unit of rate limit of the backup task")
_ = flagSet.MarkHidden(flagBackupRateLimitUnit)
}

func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
Expand All @@ -57,6 +63,11 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
if err != nil {
return err
}
ratelimitUnit, err := flagSet.GetUint64(flagBackupRateLimitUnit)
if err != nil {
return err
}
ratelimit *= ratelimitUnit

concurrency, err := flagSet.GetUint32(flagBackupConcurrency)
if err != nil {
Expand Down
39 changes: 19 additions & 20 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/zap"
Expand Down Expand Up @@ -71,31 +72,31 @@ func (bc *Client) GetTS(ctx context.Context, timeAgo string) (uint64, error) {
if err != nil {
return 0, errors.Trace(err)
}
backupTS := oracle.ComposeTS(p, l)

if timeAgo != "" {
duration, err := time.ParseDuration(timeAgo)
if err != nil {
return 0, errors.Trace(err)
}
t := duration.Nanoseconds() / int64(time.Millisecond)
log.Info("backup time ago", zap.Int64("MillisecondsAgo", t))

// check backup time do not exceed GCSafePoint
safePoint, err := GetGCSafePoint(ctx, bc.mgr.GetPDClient())
if err != nil {
return 0, errors.Trace(err)
if duration <= 0 {
return 0, errors.New("negative timeago is not allowed")
}
if p-t < safePoint.Physical {
return 0, errors.New("given backup time exceed GCSafePoint")
log.Info("backup time ago", zap.Duration("timeago", duration))

backupTime := oracle.GetTimeFromTS(backupTS)
backupAgo := backupTime.Add(-duration)
if backupTS < oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) {
return 0, errors.New("backup ts overflow please choose a smaller timeago")
}
p -= t
backupTS = oracle.ComposeTS(oracle.GetPhysical(backupAgo), l)
}

ts := utils.Timestamp{
Physical: p,
Logical: l,
// check backup time do not exceed GCSafePoint
err = CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), backupTS)
if err != nil {
return 0, errors.Trace(err)
}
backupTS := utils.EncodeTs(ts)
log.Info("backup encode timestamp", zap.Uint64("BackupTS", backupTS))
return backupTS, nil
}
Expand Down Expand Up @@ -281,7 +282,7 @@ func (bc *Client) BackupRanges(
ranges []Range,
lastBackupTS uint64,
backupTS uint64,
rate uint64,
rateLimit uint64,
concurrency uint32,
updateCh chan<- struct{},
) error {
Expand All @@ -297,7 +298,7 @@ func (bc *Client) BackupRanges(
go func() {
for _, r := range ranges {
err := bc.backupRange(
ctx, r.StartKey, r.EndKey, lastBackupTS, backupTS, rate, concurrency, updateCh)
ctx, r.StartKey, r.EndKey, lastBackupTS, backupTS, rateLimit, concurrency, updateCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -342,7 +343,7 @@ func (bc *Client) backupRange(
startKey, endKey []byte,
lastBackupTS uint64,
backupTS uint64,
rateMBs uint64,
rateLimit uint64,
concurrency uint32,
updateCh chan<- struct{},
) (err error) {
Expand All @@ -357,12 +358,10 @@ func (bc *Client) backupRange(
summary.CollectSuccessUnit(key, elapsed)
}
}()
// The unit of rate limit in protocol is bytes per second.
rateLimit := rateMBs * 1024 * 1024
log.Info("backup started",
zap.Binary("StartKey", startKey),
zap.Binary("EndKey", endKey),
zap.Uint64("RateLimit", rateMBs),
zap.Uint64("RateLimit", rateLimit),
zap.Uint32("Concurrency", concurrency))
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
31 changes: 17 additions & 14 deletions pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/utils"
)

type testBackup struct {
Expand Down Expand Up @@ -61,7 +61,7 @@ func (r *testBackup) TestGetTS(c *C) {
currentTs := time.Now().UnixNano() / int64(time.Millisecond)
ts, err := r.backupClient.GetTS(r.ctx, timeAgo)
c.Assert(err, IsNil)
pdTs := utils.DecodeTs(ts).Physical
pdTs := oracle.ExtractPhysical(ts)
duration := int(currentTs - pdTs)
c.Assert(duration, Greater, expectedDuration-deviation)
c.Assert(duration, Less, expectedDuration+deviation)
Expand All @@ -72,27 +72,30 @@ func (r *testBackup) TestGetTS(c *C) {
currentTs = time.Now().UnixNano() / int64(time.Millisecond)
ts, err = r.backupClient.GetTS(r.ctx, timeAgo)
c.Assert(err, IsNil)
pdTs = utils.DecodeTs(ts).Physical
pdTs = oracle.ExtractPhysical(ts)
duration = int(currentTs - pdTs)
c.Assert(duration, Greater, expectedDuration-deviation)
c.Assert(duration, Less, expectedDuration+deviation)

// timeago = "-1m"
timeAgo = "-1m"
expectedDuration = -60000
currentTs = time.Now().UnixNano() / int64(time.Millisecond)
ts, err = r.backupClient.GetTS(r.ctx, timeAgo)
c.Assert(err, IsNil)
pdTs = utils.DecodeTs(ts).Physical
duration = int(currentTs - pdTs)
c.Assert(duration, Greater, expectedDuration-deviation)
c.Assert(duration, Less, expectedDuration+deviation)
_, err = r.backupClient.GetTS(r.ctx, timeAgo)
c.Assert(err, ErrorMatches, "negative timeago is not allowed")

// timeago = "1000000h" exceed GCSafePoint
// because GCSafePoint in mockPDClient is 0
// timeago = "1000000h" overflows
timeAgo = "1000000h"
_, err = r.backupClient.GetTS(r.ctx, timeAgo)
c.Assert(err, ErrorMatches, "given backup time exceed GCSafePoint")
c.Assert(err, ErrorMatches, "backup ts overflow.*")

// timeago = "10h" exceed GCSafePoint
p, l, err := r.backupClient.mgr.GetPDClient().GetTS(r.ctx)
c.Assert(err, IsNil)
now := oracle.ComposeTS(p, l)
_, err = r.backupClient.mgr.GetPDClient().UpdateGCSafePoint(r.ctx, now)
c.Assert(err, IsNil)
timeAgo = "10h"
_, err = r.backupClient.GetTS(r.ctx, timeAgo)
c.Assert(err, ErrorMatches, "GC safepoint [0-9]+ exceed TS [0-9]+")
}

func (r *testBackup) TestBuildTableRange(c *C) {
Expand Down
17 changes: 7 additions & 10 deletions pkg/backup/safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,29 @@ import (
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/utils"
)

// GetGCSafePoint returns the current gc safe point.
// getGCSafePoint returns the current gc safe point.
// TODO: Some cluster may not enable distributed GC.
func GetGCSafePoint(ctx context.Context, pdClient pd.Client) (utils.Timestamp, error) {
func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) {
safePoint, err := pdClient.UpdateGCSafePoint(ctx, 0)
if err != nil {
return utils.Timestamp{}, err
return 0, err
}
return utils.DecodeTs(safePoint), nil
return safePoint, nil
}

// CheckGCSafepoint checks whether the ts is older than GC safepoint.
// Note: It ignores errors other than exceed GC safepoint.
func CheckGCSafepoint(ctx context.Context, pdClient pd.Client, ts uint64) error {
// TODO: use PDClient.GetGCSafePoint instead once PD client exports it.
safePoint, err := GetGCSafePoint(ctx, pdClient)
safePoint, err := getGCSafePoint(ctx, pdClient)
if err != nil {
log.Warn("fail to get GC safe point", zap.Error(err))
return nil
}
safePointTS := utils.EncodeTs(safePoint)
if ts <= safePointTS {
return errors.Errorf("GC safepoint %d exceed TS %d", safePointTS, ts)
if ts <= safePoint {
return errors.Errorf("GC safepoint %d exceed TS %d", safePoint, ts)
}
return nil
}
7 changes: 2 additions & 5 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
Expand Down Expand Up @@ -129,11 +130,7 @@ func (rc *Client) GetTS(ctx context.Context) (uint64, error) {
if err != nil {
return 0, errors.Trace(err)
}
ts := utils.Timestamp{
Physical: p,
Logical: l,
}
restoreTS := utils.EncodeTs(ts)
restoreTS := oracle.ComposeTS(p, l)
return restoreTS, nil
}

Expand Down
24 changes: 0 additions & 24 deletions pkg/utils/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,12 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/store/tikv/oracle"
)

const (
resetTSURL = "/pd/api/v1/admin/reset-ts"
)

// Timestamp is composed by a physical unix timestamp and a logical timestamp.
type Timestamp struct {
Physical int64
Logical int64
}

const physicalShiftBits = 18

// DecodeTs decodes Timestamp from a uint64
func DecodeTs(ts uint64) Timestamp {
physical := oracle.ExtractPhysical(ts)
logical := ts - (uint64(physical) << physicalShiftBits)
return Timestamp{
Physical: physical,
Logical: int64(logical),
}
}

// EncodeTs encodes Timestamp into a uint64
func EncodeTs(tp Timestamp) uint64 {
return uint64((tp.Physical << physicalShiftBits) + tp.Logical)
}

// ResetTS resets the timestamp of PD to a bigger value
func ResetTS(pdAddr string, ts uint64) error {
req, err := json.Marshal(struct {
Expand Down
22 changes: 0 additions & 22 deletions pkg/utils/tso_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/utils/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package utils

// unit of storage
const (
B = 1 << (iota * 10)
B = uint64(1) << (iota * 10)
KB
MB
GB
Expand Down
17 changes: 17 additions & 0 deletions pkg/utils/unit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package utils

import (
. "github.com/pingcap/check"
)

type testUnitSuite struct{}

var _ = Suite(&testUnitSuite{})

func (r *testUnitSuite) TestLoadBackupMeta(c *C) {
c.Assert(B, Equals, uint64(1))
c.Assert(KB, Equals, uint64(1024))
c.Assert(MB, Equals, uint64(1024*1024))
c.Assert(GB, Equals, uint64(1024*1024*1024))
c.Assert(TB, Equals, uint64(1024*1024*1024*1024))
}
Loading