From f5098339cec1b8298e7f34c86dea64f485e5b5ca Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 6 Jul 2020 14:26:35 +0800 Subject: [PATCH 1/8] backup: set syncpoint at background --- pkg/backup/client.go | 7 +------ pkg/backup/safe_point.go | 31 +++++++++++++++++++++++++++++++ pkg/task/backup.go | 1 + 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 127df708a..e8f8bf2c2 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -418,12 +418,7 @@ func (bc *Client) BackupRanges( finished := false for { - err := UpdateServiceSafePoint(ctx, bc.mgr.GetPDClient(), bc.GetGCTTL(), backupTS) - if err != nil { - log.Error("update GC safePoint with TTL failed", zap.Error(err)) - return err - } - err = CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS) + err := CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS) if err != nil { log.Error("check GC safePoint failed", zap.Error(err)) return err diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index 83be5c9fc..f2432de00 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -4,7 +4,9 @@ package backup import ( "context" + "time" + "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" "github.com/pingcap/log" pd "github.com/pingcap/pd/v4/client" @@ -52,3 +54,32 @@ func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, brServiceSafePointID, ttl, backupTS-1) return err } + +// StartServiceSafePointKeeper will run UpdateServiceSafePoint periodicity +// hence keeping service safepoint won't lose. +func StartServiceSafePointKeeper( + ctx context.Context, + ttl int64, + pdClient pd.Client, + backupTS uint64, +) { + // At least 1 second gap, or time.NewTicker will blame us. + gapSec := time.Duration(utils.MaxInt(int(ttl/5), 1)) + tick := time.NewTicker(gapSec * time.Second) + log.Debug("ServiceSafePointKeeper started", zap.Int("gap", int(gapSec))) + go func() { + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + if err := UpdateServiceSafePoint(ctx, pdClient, ttl, backupTS); err != nil { + log.Error("failed to update service safe point, backup may fail if gc triggered", + zap.Error(err), + ) + } + } + } + }() +} diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 4d7f6543a..67dae7d75 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -136,6 +136,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return err } g.Record("BackupTS", backupTS) + backup.StartServiceSafePointKeeper(ctx, client.GetGCTTL(), mgr.GetPDClient(), backupTS) isIncrementalBackup := cfg.LastBackupTS > 0 From e5a2e5d074cd723f3b438d74cedbca115be81c07 Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 6 Jul 2020 15:11:17 +0800 Subject: [PATCH 2/8] backup: use real duration --- pkg/backup/safe_point.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index f2432de00..80aebd3bd 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -6,7 +6,6 @@ import ( "context" "time" - "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" "github.com/pingcap/log" pd "github.com/pingcap/pd/v4/client" @@ -14,7 +13,8 @@ import ( ) const ( - brServiceSafePointID = "br" + brServiceSafePointID = "br" + preUpdateServiceSafePointFactor = 5 // DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min DefaultBRGCSafePointTTL = 5 * 60 ) @@ -63,10 +63,9 @@ func StartServiceSafePointKeeper( pdClient pd.Client, backupTS uint64, ) { - // At least 1 second gap, or time.NewTicker will blame us. - gapSec := time.Duration(utils.MaxInt(int(ttl/5), 1)) - tick := time.NewTicker(gapSec * time.Second) - log.Debug("ServiceSafePointKeeper started", zap.Int("gap", int(gapSec))) + gapTime := time.Duration(ttl) * time.Second / preUpdateServiceSafePointFactor + // It would be OK since ttl won't be zero, so gapTime should > 0. + tick := time.NewTicker(gapTime) go func() { defer tick.Stop() for { From b7453d27e5b410e3e03601d7c4ef91e67b29316d Mon Sep 17 00:00:00 2001 From: Hillium Date: Tue, 7 Jul 2020 16:35:01 +0800 Subject: [PATCH 3/8] backup: run check safepoint at background --- pkg/backup/client.go | 14 +------------- pkg/backup/safe_point.go | 40 +++++++++++++++++++++++++++++----------- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index e8f8bf2c2..034bdcbd2 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -416,23 +416,11 @@ func (bc *Client) BackupRanges( log.Info("current backup safePoint job", zap.Uint64("backupTS", backupTS)) - finished := false for { - err := CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS) - if err != nil { - log.Error("check GC safePoint failed", zap.Error(err)) - return err - } - if finished { - // Return error (if there is any) before finishing backup. - return err - } select { case err, ok := <-errCh: if !ok { - // Before finish backup, we have to make sure - // the backup ts does not fall behind with GC safepoint. - finished = true + return nil } if err != nil { return err diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index 80aebd3bd..02d2a926a 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -14,7 +14,8 @@ import ( const ( brServiceSafePointID = "br" - preUpdateServiceSafePointFactor = 5 + preUpdateServiceSafePointFactor = 3 + checkGCSafePointGapTime = 5 * time.Second // DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min DefaultBRGCSafePointTTL = 5 * 60 ) @@ -63,21 +64,38 @@ func StartServiceSafePointKeeper( pdClient pd.Client, backupTS uint64, ) { - gapTime := time.Duration(ttl) * time.Second / preUpdateServiceSafePointFactor - // It would be OK since ttl won't be zero, so gapTime should > 0. - tick := time.NewTicker(gapTime) + // It would be OK since ttl won't be zero, so gapTime should > `0. + updateGapTime := time.Duration(ttl) * time.Second / preUpdateServiceSafePointFactor + update := func(ctx context.Context) { + if err := UpdateServiceSafePoint(ctx, pdClient, ttl, backupTS); err != nil { + log.Error("failed to update service safe point, backup may fail if gc triggered", + zap.Error(err), + ) + } + } + check := func(ctx context.Context) { + if err := CheckGCSafePoint(ctx, pdClient, backupTS); err != nil { + log.Panic("cannot pass gc safe point check, aborting", + zap.Error(err), + zap.Uint64("backupTS", backupTS), + ) + } + } + updateTick := time.NewTicker(updateGapTime) + checkTick := time.NewTicker(checkGCSafePointGapTime) go func() { - defer tick.Stop() + defer updateTick.Stop() for { select { case <-ctx.Done(): + // Before finish backup, we have to make sure + // the backup ts does not fall behind with GC safepoint. + check(context.TODO()) return - case <-tick.C: - if err := UpdateServiceSafePoint(ctx, pdClient, ttl, backupTS); err != nil { - log.Error("failed to update service safe point, backup may fail if gc triggered", - zap.Error(err), - ) - } + case <-updateTick.C: + update(ctx) + case <-checkTick.C: + check(ctx) } } }() From aa769126352d9d450de055aaa63da698f6deeef3 Mon Sep 17 00:00:00 2001 From: Hillium Date: Tue, 7 Jul 2020 16:46:08 +0800 Subject: [PATCH 4/8] backup: fix some errors after merging --- pkg/backup/client.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 1fe0d4624..0c2ae550d 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -448,18 +448,13 @@ func (bc *Client) BackupRanges( log.Info("current backup safePoint job", zap.Uint64("backupTS", backupTS)) - for { - select { - case err, ok := <-errCh: - if !ok { - return nil - } - if err != nil { - return nil, err - } - case <-t.C: + for err := range errCh { + if err != nil { + return nil, err } } + + return allFiles, nil } // BackupRange make a backup of the given key range. From bb84597c403b27a92bbd2a98e0673f90690b6477 Mon Sep 17 00:00:00 2001 From: Hillium Date: Tue, 7 Jul 2020 17:32:08 +0800 Subject: [PATCH 5/8] backup: fix some problems --- pkg/backup/client.go | 9 --------- pkg/backup/safe_point.go | 1 + pkg/task/backup.go | 10 +++++++++- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 0c2ae550d..7ecf1edc1 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -439,15 +439,6 @@ func (bc *Client) BackupRanges( t := time.NewTicker(time.Second * 5) defer t.Stop() - backupTS := req.EndVersion - // use lastBackupTS as safePoint if exists - if req.StartVersion > 0 { - backupTS = req.StartVersion - } - - log.Info("current backup safePoint job", - zap.Uint64("backupTS", backupTS)) - for err := range errCh { if err != nil { return nil, err diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index 02d2a926a..ae3cb3b50 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -85,6 +85,7 @@ func StartServiceSafePointKeeper( checkTick := time.NewTicker(checkGCSafePointGapTime) go func() { defer updateTick.Stop() + defer checkTick.Stop() for { select { case <-ctx.Done(): diff --git a/pkg/task/backup.go b/pkg/task/backup.go index cc92bfe0d..0956df988 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -136,7 +136,15 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return err } g.Record("BackupTS", backupTS) - backup.StartServiceSafePointKeeper(ctx, client.GetGCTTL(), mgr.GetPDClient(), backupTS) + safePoint := backupTS + // use lastBackupTS as safePoint if exists + if cfg.LastBackupTS > 0 { + safePoint = cfg.LastBackupTS + } + + log.Info("current backup safePoint job", + zap.Uint64("safepoint", safePoint)) + backup.StartServiceSafePointKeeper(ctx, client.GetGCTTL(), mgr.GetPDClient(), safePoint) isIncrementalBackup := cfg.LastBackupTS > 0 From 8e1a81e5b00534e49aad7ea413d99a64b0e74fa3 Mon Sep 17 00:00:00 2001 From: Hillium Date: Tue, 7 Jul 2020 17:53:44 +0800 Subject: [PATCH 6/8] backup: ensure all files are collected. --- pkg/backup/client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 7ecf1edc1..333c71b70 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -403,6 +403,7 @@ func (bc *Client) BackupRanges( // we collect all files in a single goroutine to avoid thread safety issues. filesCh := make(chan []*kvproto.File, concurrency) allFiles := make([]*kvproto.File, 0, len(ranges)) + allFilesCollected := make(chan struct{}, 1) go func() { init := time.Now() start, cur := init, init @@ -412,6 +413,7 @@ func (bc *Client) BackupRanges( summary.CollectSuccessUnit("backup ranges", 1, cur.Sub(start)) } log.Info("Backup Ranges", zap.Duration("take", cur.Sub(init))) + allFilesCollected <- struct{}{} }() go func() { @@ -445,6 +447,7 @@ func (bc *Client) BackupRanges( } } + <-allFilesCollected return allFiles, nil } From e67ef2c220c333050500e7bc911e0c44f02b9259 Mon Sep 17 00:00:00 2001 From: Hillium Date: Wed, 8 Jul 2020 18:03:25 +0800 Subject: [PATCH 7/8] backup: use select to sync files --- pkg/backup/client.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 333c71b70..fd06f3a99 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -447,8 +447,14 @@ func (bc *Client) BackupRanges( } } - <-allFilesCollected - return allFiles, nil + select { + case <-allFilesCollected: + return allFiles, nil + case <-ctx.Done(): + return nil, errors.Trace(ctx.Err()) + case err := <-errCh: + return nil, errors.Trace(err) + } } // BackupRange make a backup of the given key range. From 422036d0684b79b9e3cb23f0c4af0d02113ccbc3 Mon Sep 17 00:00:00 2001 From: Hillium Date: Wed, 8 Jul 2020 18:05:01 +0800 Subject: [PATCH 8/8] backup: don't select on errCh --- pkg/backup/client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index fd06f3a99..1a6b6cc3b 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -452,8 +452,6 @@ func (bc *Client) BackupRanges( return allFiles, nil case <-ctx.Done(): return nil, errors.Trace(ctx.Err()) - case err := <-errCh: - return nil, errors.Trace(err) } }