diff --git a/Makefile b/Makefile index a03cedc54..839a27b9e 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,8 @@ build_for_integration_test: -o bin/br.test # build key locker GO111MODULE=on go build -race -o bin/locker tests/br_key_locked/*.go + # build gc + GO111MODULE=on go build -race -o bin/gc tests/br_z_gc_safepoint/*.go test: GO111MODULE=on go test -race -tags leak ./... diff --git a/cmd/backup.go b/cmd/backup.go index 34e1a8970..73ae6106f 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -17,11 +17,12 @@ import ( ) const ( - flagBackupTimeago = "timeago" - flagBackupRateLimit = "ratelimit" - flagBackupConcurrency = "concurrency" - flagBackupChecksum = "checksum" - flagLastBackupTS = "lastbackupts" + flagBackupTimeago = "timeago" + flagBackupRateLimit = "ratelimit" + flagBackupRateLimitUnit = "ratelimit-unit" + flagBackupConcurrency = "concurrency" + flagBackupChecksum = "checksum" + flagLastBackupTS = "lastbackupts" ) func defineBackupFlags(flagSet *pflag.FlagSet) { @@ -36,6 +37,11 @@ func defineBackupFlags(flagSet *pflag.FlagSet) { "Run checksum after backup") flagSet.Uint64P(flagLastBackupTS, "", 0, "the last time backup ts") _ = flagSet.MarkHidden(flagLastBackupTS) + + // Test only flag. + flagSet.Uint64P( + flagBackupRateLimitUnit, "", utils.MB, "The unit of rate limit of the backup task") + _ = flagSet.MarkHidden(flagBackupRateLimitUnit) } func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error { @@ -57,6 +63,11 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error { if err != nil { return err } + ratelimitUnit, err := flagSet.GetUint64(flagBackupRateLimitUnit) + if err != nil { + return err + } + ratelimit *= ratelimitUnit concurrency, err := flagSet.GetUint32(flagBackupConcurrency) if err != nil { diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 0d89ac4e1..5cba2d9bf 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/ranger" "go.uber.org/zap" @@ -71,31 +72,31 @@ func (bc *Client) GetTS(ctx context.Context, timeAgo string) (uint64, error) { if err != nil { return 0, errors.Trace(err) } + backupTS := oracle.ComposeTS(p, l) if timeAgo != "" { duration, err := time.ParseDuration(timeAgo) if err != nil { return 0, errors.Trace(err) } - t := duration.Nanoseconds() / int64(time.Millisecond) - log.Info("backup time ago", zap.Int64("MillisecondsAgo", t)) - - // check backup time do not exceed GCSafePoint - safePoint, err := GetGCSafePoint(ctx, bc.mgr.GetPDClient()) - if err != nil { - return 0, errors.Trace(err) + if duration <= 0 { + return 0, errors.New("negative timeago is not allowed") } - if p-t < safePoint.Physical { - return 0, errors.New("given backup time exceed GCSafePoint") + log.Info("backup time ago", zap.Duration("timeago", duration)) + + backupTime := oracle.GetTimeFromTS(backupTS) + backupAgo := backupTime.Add(-duration) + if backupTS < oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) { + return 0, errors.New("backup ts overflow please choose a smaller timeago") } - p -= t + backupTS = oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) } - ts := utils.Timestamp{ - Physical: p, - Logical: l, + // check backup time do not exceed GCSafePoint + err = CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), backupTS) + if err != nil { + return 0, errors.Trace(err) } - backupTS := utils.EncodeTs(ts) log.Info("backup encode timestamp", zap.Uint64("BackupTS", backupTS)) return backupTS, nil } @@ -281,7 +282,7 @@ func (bc *Client) BackupRanges( ranges []Range, lastBackupTS uint64, backupTS uint64, - rate uint64, + rateLimit uint64, concurrency uint32, updateCh chan<- struct{}, ) error { @@ -297,7 +298,7 @@ func (bc *Client) BackupRanges( go func() { for _, r := range ranges { err := bc.backupRange( - ctx, r.StartKey, r.EndKey, lastBackupTS, backupTS, rate, concurrency, updateCh) + ctx, r.StartKey, r.EndKey, lastBackupTS, backupTS, rateLimit, concurrency, updateCh) if err != nil { errCh <- err return @@ -342,7 +343,7 @@ func (bc *Client) backupRange( startKey, endKey []byte, lastBackupTS uint64, backupTS uint64, - rateMBs uint64, + rateLimit uint64, concurrency uint32, updateCh chan<- struct{}, ) (err error) { @@ -357,12 +358,10 @@ func (bc *Client) backupRange( summary.CollectSuccessUnit(key, elapsed) } }() - // The unit of rate limit in protocol is bytes per second. - rateLimit := rateMBs * 1024 * 1024 log.Info("backup started", zap.Binary("StartKey", startKey), zap.Binary("EndKey", endKey), - zap.Uint64("RateLimit", rateMBs), + zap.Uint64("RateLimit", rateLimit), zap.Uint32("Concurrency", concurrency)) ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/pkg/backup/client_test.go b/pkg/backup/client_test.go index 6971026d5..44ca1ad5a 100644 --- a/pkg/backup/client_test.go +++ b/pkg/backup/client_test.go @@ -10,11 +10,11 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/mocktikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/br/pkg/conn" - "github.com/pingcap/br/pkg/utils" ) type testBackup struct { @@ -61,7 +61,7 @@ func (r *testBackup) TestGetTS(c *C) { currentTs := time.Now().UnixNano() / int64(time.Millisecond) ts, err := r.backupClient.GetTS(r.ctx, timeAgo) c.Assert(err, IsNil) - pdTs := utils.DecodeTs(ts).Physical + pdTs := oracle.ExtractPhysical(ts) duration := int(currentTs - pdTs) c.Assert(duration, Greater, expectedDuration-deviation) c.Assert(duration, Less, expectedDuration+deviation) @@ -72,27 +72,30 @@ func (r *testBackup) TestGetTS(c *C) { currentTs = time.Now().UnixNano() / int64(time.Millisecond) ts, err = r.backupClient.GetTS(r.ctx, timeAgo) c.Assert(err, IsNil) - pdTs = utils.DecodeTs(ts).Physical + pdTs = oracle.ExtractPhysical(ts) duration = int(currentTs - pdTs) c.Assert(duration, Greater, expectedDuration-deviation) c.Assert(duration, Less, expectedDuration+deviation) // timeago = "-1m" timeAgo = "-1m" - expectedDuration = -60000 - currentTs = time.Now().UnixNano() / int64(time.Millisecond) - ts, err = r.backupClient.GetTS(r.ctx, timeAgo) - c.Assert(err, IsNil) - pdTs = utils.DecodeTs(ts).Physical - duration = int(currentTs - pdTs) - c.Assert(duration, Greater, expectedDuration-deviation) - c.Assert(duration, Less, expectedDuration+deviation) + _, err = r.backupClient.GetTS(r.ctx, timeAgo) + c.Assert(err, ErrorMatches, "negative timeago is not allowed") - // timeago = "1000000h" exceed GCSafePoint - // because GCSafePoint in mockPDClient is 0 + // timeago = "1000000h" overflows timeAgo = "1000000h" _, err = r.backupClient.GetTS(r.ctx, timeAgo) - c.Assert(err, ErrorMatches, "given backup time exceed GCSafePoint") + c.Assert(err, ErrorMatches, "backup ts overflow.*") + + // timeago = "10h" exceed GCSafePoint + p, l, err := r.backupClient.mgr.GetPDClient().GetTS(r.ctx) + c.Assert(err, IsNil) + now := oracle.ComposeTS(p, l) + _, err = r.backupClient.mgr.GetPDClient().UpdateGCSafePoint(r.ctx, now) + c.Assert(err, IsNil) + timeAgo = "10h" + _, err = r.backupClient.GetTS(r.ctx, timeAgo) + c.Assert(err, ErrorMatches, "GC safepoint [0-9]+ exceed TS [0-9]+") } func (r *testBackup) TestBuildTableRange(c *C) { diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index bc24a01ba..bb73bc7d9 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -7,32 +7,29 @@ import ( "github.com/pingcap/log" pd "github.com/pingcap/pd/client" "go.uber.org/zap" - - "github.com/pingcap/br/pkg/utils" ) -// GetGCSafePoint returns the current gc safe point. +// getGCSafePoint returns the current gc safe point. // TODO: Some cluster may not enable distributed GC. -func GetGCSafePoint(ctx context.Context, pdClient pd.Client) (utils.Timestamp, error) { +func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { safePoint, err := pdClient.UpdateGCSafePoint(ctx, 0) if err != nil { - return utils.Timestamp{}, err + return 0, err } - return utils.DecodeTs(safePoint), nil + return safePoint, nil } // CheckGCSafepoint checks whether the ts is older than GC safepoint. // Note: It ignores errors other than exceed GC safepoint. func CheckGCSafepoint(ctx context.Context, pdClient pd.Client, ts uint64) error { // TODO: use PDClient.GetGCSafePoint instead once PD client exports it. - safePoint, err := GetGCSafePoint(ctx, pdClient) + safePoint, err := getGCSafePoint(ctx, pdClient) if err != nil { log.Warn("fail to get GC safe point", zap.Error(err)) return nil } - safePointTS := utils.EncodeTs(safePoint) - if ts <= safePointTS { - return errors.Errorf("GC safepoint %d exceed TS %d", safePointTS, ts) + if ts <= safePoint { + return errors.Errorf("GC safepoint %d exceed TS %d", safePoint, ts) } return nil } diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 02875cdb2..9714edc2a 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -16,6 +16,7 @@ import ( restore_util "github.com/pingcap/tidb-tools/pkg/restore-util" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv/oracle" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -129,11 +130,7 @@ func (rc *Client) GetTS(ctx context.Context) (uint64, error) { if err != nil { return 0, errors.Trace(err) } - ts := utils.Timestamp{ - Physical: p, - Logical: l, - } - restoreTS := utils.EncodeTs(ts) + restoreTS := oracle.ComposeTS(p, l) return restoreTS, nil } diff --git a/pkg/utils/tso.go b/pkg/utils/tso.go index 44c23fc48..a4ca5f5b5 100644 --- a/pkg/utils/tso.go +++ b/pkg/utils/tso.go @@ -8,36 +8,12 @@ import ( "strings" "github.com/pingcap/errors" - "github.com/pingcap/tidb/store/tikv/oracle" ) const ( resetTSURL = "/pd/api/v1/admin/reset-ts" ) -// Timestamp is composed by a physical unix timestamp and a logical timestamp. -type Timestamp struct { - Physical int64 - Logical int64 -} - -const physicalShiftBits = 18 - -// DecodeTs decodes Timestamp from a uint64 -func DecodeTs(ts uint64) Timestamp { - physical := oracle.ExtractPhysical(ts) - logical := ts - (uint64(physical) << physicalShiftBits) - return Timestamp{ - Physical: physical, - Logical: int64(logical), - } -} - -// EncodeTs encodes Timestamp into a uint64 -func EncodeTs(tp Timestamp) uint64 { - return uint64((tp.Physical << physicalShiftBits) + tp.Logical) -} - // ResetTS resets the timestamp of PD to a bigger value func ResetTS(pdAddr string, ts uint64) error { req, err := json.Marshal(struct { diff --git a/pkg/utils/tso_test.go b/pkg/utils/tso_test.go deleted file mode 100644 index 3e6ecd9e5..000000000 --- a/pkg/utils/tso_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package utils - -import ( - "math/rand" - "time" - - . "github.com/pingcap/check" -) - -type testTsoSuite struct{} - -var _ = Suite(&testTsoSuite{}) - -func (r *testTsoSuite) TestTimestampEncodeDecode(c *C) { - rand.Seed(time.Now().UnixNano()) - for i := 0; i < 10; i++ { - ts := rand.Uint64() - tp := DecodeTs(ts) - ts1 := EncodeTs(tp) - c.Assert(ts, DeepEquals, ts1) - } -} diff --git a/pkg/utils/unit.go b/pkg/utils/unit.go index 5f8009878..a12dcb6c2 100644 --- a/pkg/utils/unit.go +++ b/pkg/utils/unit.go @@ -2,7 +2,7 @@ package utils // unit of storage const ( - B = 1 << (iota * 10) + B = uint64(1) << (iota * 10) KB MB GB diff --git a/pkg/utils/unit_test.go b/pkg/utils/unit_test.go new file mode 100644 index 000000000..5b3c00530 --- /dev/null +++ b/pkg/utils/unit_test.go @@ -0,0 +1,17 @@ +package utils + +import ( + . "github.com/pingcap/check" +) + +type testUnitSuite struct{} + +var _ = Suite(&testUnitSuite{}) + +func (r *testUnitSuite) TestLoadBackupMeta(c *C) { + c.Assert(B, Equals, uint64(1)) + c.Assert(KB, Equals, uint64(1024)) + c.Assert(MB, Equals, uint64(1024*1024)) + c.Assert(GB, Equals, uint64(1024*1024*1024)) + c.Assert(TB, Equals, uint64(1024*1024*1024*1024)) +} diff --git a/tests/br_z_gc_safepoint/gc.go b/tests/br_z_gc_safepoint/gc.go new file mode 100644 index 000000000..a18367259 --- /dev/null +++ b/tests/br_z_gc_safepoint/gc.go @@ -0,0 +1,64 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Test backup with exceeding GC safe point. + +package main + +import ( + "context" + "flag" + "time" + + "github.com/pingcap/log" + pd "github.com/pingcap/pd/client" + "github.com/pingcap/tidb/store/tikv/oracle" + "go.uber.org/zap" +) + +var ( + pdAddr = flag.String("pd", "", "PD address") + gcOffset = flag.Duration("gc-offset", time.Second*10, + "Set GC safe point to current time - gc-offset, default: 10s") +) + +func main() { + flag.Parse() + if *pdAddr == "" { + log.Fatal("pd address is empty") + } + if *gcOffset == time.Duration(0) { + log.Fatal("zero gc-offset is not allowed") + } + + timeout := time.Second * 10 + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + pdclient, err := pd.NewClientWithContext(ctx, []string{*pdAddr}, pd.SecurityOption{}) + if err != nil { + log.Fatal("create pd client failed", zap.Error(err)) + } + p, l, err := pdclient.GetTS(ctx) + if err != nil { + log.Fatal("get ts failed", zap.Error(err)) + } + now := oracle.ComposeTS(p, l) + nowMinusOffset := oracle.GetTimeFromTS(now).Add(-*gcOffset) + newSP := oracle.ComposeTS(oracle.GetPhysical(nowMinusOffset), 0) + _, err = pdclient.UpdateGCSafePoint(ctx, newSP) + if err != nil { + log.Fatal("create pd client failed", zap.Error(err)) + } + + log.Info("update GC safe point", zap.Uint64("SP", newSP), zap.Uint64("now", now)) +} diff --git a/tests/br_z_gc_safepoint/run.sh b/tests/br_z_gc_safepoint/run.sh new file mode 100755 index 000000000..916ca1fa8 --- /dev/null +++ b/tests/br_z_gc_safepoint/run.sh @@ -0,0 +1,46 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +# Test whether BR fails fast when backup ts exceeds GC safe point. +# It is call br_*z*_gc_safepoint, because it brings lots of write and +# slows down other tests to changing GC safe point. Adding a z prefix to run +# the test last. + +set -eu + +DB="$TEST_NAME" +TABLE="usertable" + +run_sql "CREATE DATABASE $DB;" + +go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB + +row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') + +# Update GC safepoint to now + 5s after 10s seconds. +sleep 10 && bin/gc -pd $PD_ADDR -gc-offset "5s" & + +# Set ratelimit to 1 bytes/second, we assume it can not finish within 10s, +# so it will trigger exceed GC safe point error. +backup_gc_fail=0 +echo "backup start (expect fail)..." +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1 + +if [ "$backup_gc_fail" -ne "1" ];then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +run_sql "DROP TABLE $DB.$TABLE;" diff --git a/tests/br_z_gc_safepoint/workload b/tests/br_z_gc_safepoint/workload new file mode 100644 index 000000000..448ca3c1a --- /dev/null +++ b/tests/br_z_gc_safepoint/workload @@ -0,0 +1,12 @@ +recordcount=1000 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform \ No newline at end of file