From d0805f588559587d409b8a5b564bf6d094c354fc Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Fri, 1 Nov 2019 11:33:58 +0800 Subject: [PATCH 1/4] restore: add metrics Signed-off-by: 5kbpers --- cmd/cmd.go | 15 +++++++++++++++ pkg/restore/client.go | 13 ++++++++++--- pkg/restore/metrics.go | 29 +++++++++++++++++++++++++++++ pkg/utils/metrics.go | 20 ++++++++++++++++++++ 4 files changed, 74 insertions(+), 3 deletions(-) create mode 100644 pkg/restore/metrics.go create mode 100644 pkg/utils/metrics.go diff --git a/cmd/cmd.go b/cmd/cmd.go index 499eea6bd..138a7dd08 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -4,9 +4,11 @@ import ( "context" "net/http" "sync" + "time" "github.com/pingcap/br/pkg/meta" "github.com/pingcap/br/pkg/raw" + "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/spf13/cobra" @@ -37,6 +39,8 @@ const ( FlagLogFile = "log-file" // FlagStatusAddr is the name of status-addr flag. FlagStatusAddr = "status-addr" + // FlagPrometheusAddr is the name of prometheus-addr flag + FlagPrometheusAddr = "prometheus-addr" ) // AddFlags adds flags to the given cmd. @@ -53,6 +57,8 @@ func AddFlags(cmd *cobra.Command) { "Set the log file path. If not set, logs will output to stdout") cmd.PersistentFlags().String(FlagStatusAddr, "localhost:6060", "Set the HTTP listening address for the status report service. Set to empty string to disable") + cmd.PersistentFlags().String(FlagPrometheusAddr, "", + "Set the HTTP address for the prometheus metrics service. Set to empty string to disable") cmd.MarkFlagRequired(FlagPD) cmd.MarkFlagRequired(FlagStorage) } @@ -95,6 +101,15 @@ func Init(ctx context.Context, cmd *cobra.Command) (err error) { }() } + prometheusAddr, e := cmd.Flags().GetString(FlagPrometheusAddr) + if e != nil { + err = e + return + } + go func() { + utils.PushPrometheus("br", prometheusAddr, time.Second * 10) + }() + // Initialize the main program. var addr string addr, err = cmd.Flags().GetString(FlagPD) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 64fd25ca7..07700e653 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -191,9 +191,9 @@ func (rc *Client) CreateTable(table *utils.Table) (*restore_util.RewriteRules, e // RestoreTable tries to restore the data of a table. func (rc *Client) RestoreTable(table *utils.Table, rewriteRules *restore_util.RewriteRules, restoreTS uint64) error { + label := fmt.Sprintf("%s.%s", table.Db.Name, table.Schema.Name) log.Info("start to restore table", - zap.Stringer("table", table.Schema.Name), - zap.Stringer("db", table.Db.Name), + zap.String("label", label), zap.Array("files", files(table.Files)), zap.Reflect("rewriteRules", rewriteRules), ) @@ -209,7 +209,13 @@ func (rc *Client) RestoreTable(table *utils.Table, rewriteRules *restore_util.Re select { case <-rc.ctx.Done(): errCh <- nil - case errCh <- rc.fileImporter.Import(file, encodedRules, rc.workerPool): + default : + startTime := time.Now() + err := rc.fileImporter.Import(file, encodedRules, rc.workerPool) + if err == nil { + restoreFileHistogram.Observe(time.Since(startTime).Seconds()) + } + errCh <- err } }(file) } @@ -225,6 +231,7 @@ func (rc *Client) RestoreTable(table *utils.Table, rewriteRules *restore_util.Re ) return err } + restoreFileCounter.WithLabelValues(label).Add(1) } log.Info( "finish to restore table", diff --git a/pkg/restore/metrics.go b/pkg/restore/metrics.go new file mode 100644 index 000000000..a6e5be45f --- /dev/null +++ b/pkg/restore/metrics.go @@ -0,0 +1,29 @@ +package restore + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + restoreFileCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "br", + Subsystem: "restore", + Name: "restore_file", + Help: "Restore file statistic.", + }, []string{"type"}) + + restoreFileHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "br", + Subsystem: "restore", + Name: "region_file_seconds", + Help: "Restore file latency distributions.", + Buckets: prometheus.ExponentialBuckets(0.05, 2, 16), + }) +) + +func init() { + prometheus.MustRegister(restoreFileCounter) + prometheus.MustRegister(restoreFileHistogram) +} diff --git a/pkg/utils/metrics.go b/pkg/utils/metrics.go new file mode 100644 index 000000000..6cfd155ef --- /dev/null +++ b/pkg/utils/metrics.go @@ -0,0 +1,20 @@ +package utils + +import ( + "time" + + "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/push" +) + +// PushPrometheus pushes metrics to Prometheus Pushgateway. +func PushPrometheus(job, addr string, interval time.Duration) { + for { + err := push.New(addr, job).Gatherer(prometheus.DefaultGatherer).Push() + if err != nil { + log.Warn("could not push metrics to Prometheus Pushgateway") + } + time.Sleep(interval) + } +} From cc00530b851e03f79fa63429fdaa7d726335285d Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Fri, 1 Nov 2019 15:35:26 +0800 Subject: [PATCH 2/4] cmd: add status server Signed-off-by: 5kbpers --- cmd/cmd.go | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index 138a7dd08..7ec94685a 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -3,6 +3,7 @@ package cmd import ( "context" "net/http" + "net/http/pprof" "sync" "time" @@ -11,8 +12,8 @@ import ( "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" - "go.uber.org/zap" ) var ( @@ -92,13 +93,7 @@ func Init(ctx context.Context, cmd *cobra.Command) (err error) { return } if len(statusAddr) != 0 { - go func() { - if e := http.ListenAndServe(statusAddr, nil); e != nil { - log.Warn("fail to start pprof", zap.String("addr", statusAddr), zap.Error(e)) - } else { - log.Info("start pprof", zap.String("addr", statusAddr)) - } - }() + RunServe(statusAddr) } prometheusAddr, e := cmd.Flags().GetString(FlagPrometheusAddr) @@ -107,7 +102,7 @@ func Init(ctx context.Context, cmd *cobra.Command) (err error) { return } go func() { - utils.PushPrometheus("br", prometheusAddr, time.Second * 10) + utils.PushPrometheus("br", prometheusAddr, time.Second*10) }() // Initialize the main program. @@ -143,3 +138,22 @@ func GetDefaultRawClient() *raw.BackupClient { func GetDefaultContext() context.Context { return defaultContext } + +func RunServe(addr string) { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + + go func() { + if err := http.ListenAndServe(addr, mux); err != nil { + log.Warn("start server failed") + } else { + log.Info("start server") + } + }() +} From e865278baea73aec422bb8cd1ef432eec316d3ad Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Fri, 1 Nov 2019 16:00:27 +0800 Subject: [PATCH 3/4] format code Signed-off-by: 5kbpers --- cmd/cmd.go | 4 ++-- pkg/restore/client.go | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index 7ec94685a..c5596e9ea 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -93,7 +93,7 @@ func Init(ctx context.Context, cmd *cobra.Command) (err error) { return } if len(statusAddr) != 0 { - RunServe(statusAddr) + runServe(statusAddr) } prometheusAddr, e := cmd.Flags().GetString(FlagPrometheusAddr) @@ -139,7 +139,7 @@ func GetDefaultContext() context.Context { return defaultContext } -func RunServe(addr string) { +func runServe(addr string) { mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 29c85eda9..2ed60d239 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -213,13 +213,13 @@ func (rc *Client) RestoreTable(table *utils.Table, rewriteRules *restore_util.Re select { case <-rc.ctx.Done(): errCh <- nil - default : - startTime := time.Now() - err := rc.fileImporter.Import(file, encodedRules) - if err == nil { - restoreFileHistogram.Observe(time.Since(startTime).Seconds()) - } - errCh <- err + default: + startTime := time.Now() + err := rc.fileImporter.Import(fileReplica, encodedRules) + if err == nil { + restoreFileHistogram.Observe(time.Since(startTime).Seconds()) + } + errCh <- err } }) } From 75a2889b57a99d2ac9fc2f141a2c6060dbfc7dc6 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Fri, 1 Nov 2019 19:44:07 +0800 Subject: [PATCH 4/4] address comments Signed-off-by: 5kbpers --- cmd/cmd.go | 10 ++++++---- pkg/restore/client.go | 3 ++- pkg/utils/metrics.go | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index c5596e9ea..010402852 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -101,9 +101,11 @@ func Init(ctx context.Context, cmd *cobra.Command) (err error) { err = e return } - go func() { - utils.PushPrometheus("br", prometheusAddr, time.Second*10) - }() + if len(prometheusAddr) == 0 { + go func() { + utils.PushPrometheus("br", prometheusAddr, time.Second*10) + }() + } // Initialize the main program. var addr string @@ -151,7 +153,7 @@ func runServe(addr string) { go func() { if err := http.ListenAndServe(addr, mux); err != nil { - log.Warn("start server failed") + log.Error("start server failed") } else { log.Info("start server") } diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 2ed60d239..499f669b3 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -195,7 +195,8 @@ func (rc *Client) CreateTable(table *utils.Table) (*restore_util.RewriteRules, e func (rc *Client) RestoreTable(table *utils.Table, rewriteRules *restore_util.RewriteRules, restoreTS uint64) error { label := fmt.Sprintf("%s.%s", table.Db.Name, table.Schema.Name) log.Info("start to restore table", - zap.String("label", label), + zap.Stringer("table", table.Schema.Name), + zap.Stringer("db", table.Db.Name), zap.Array("files", files(table.Files)), zap.Reflect("rewriteRules", rewriteRules), ) diff --git a/pkg/utils/metrics.go b/pkg/utils/metrics.go index 6cfd155ef..7f8af353c 100644 --- a/pkg/utils/metrics.go +++ b/pkg/utils/metrics.go @@ -13,7 +13,7 @@ func PushPrometheus(job, addr string, interval time.Duration) { for { err := push.New(addr, job).Gatherer(prometheus.DefaultGatherer).Push() if err != nil { - log.Warn("could not push metrics to Prometheus Pushgateway") + log.Error("could not push metrics to Prometheus Pushgateway") } time.Sleep(interval) }