diff --git a/cmd/cmd.go b/cmd/cmd.go index 499eea6bd..010402852 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -3,14 +3,17 @@ package cmd import ( "context" "net/http" + "net/http/pprof" "sync" + "time" "github.com/pingcap/br/pkg/meta" "github.com/pingcap/br/pkg/raw" + "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" - "go.uber.org/zap" ) var ( @@ -37,6 +40,8 @@ const ( FlagLogFile = "log-file" // FlagStatusAddr is the name of status-addr flag. FlagStatusAddr = "status-addr" + // FlagPrometheusAddr is the name of prometheus-addr flag + FlagPrometheusAddr = "prometheus-addr" ) // AddFlags adds flags to the given cmd. @@ -53,6 +58,8 @@ func AddFlags(cmd *cobra.Command) { "Set the log file path. If not set, logs will output to stdout") cmd.PersistentFlags().String(FlagStatusAddr, "localhost:6060", "Set the HTTP listening address for the status report service. Set to empty string to disable") + cmd.PersistentFlags().String(FlagPrometheusAddr, "", + "Set the HTTP address for the prometheus metrics service. Set to empty string to disable") cmd.MarkFlagRequired(FlagPD) cmd.MarkFlagRequired(FlagStorage) } @@ -86,12 +93,17 @@ func Init(ctx context.Context, cmd *cobra.Command) (err error) { return } if len(statusAddr) != 0 { + runServe(statusAddr) + } + + prometheusAddr, e := cmd.Flags().GetString(FlagPrometheusAddr) + if e != nil { + err = e + return + } + if len(prometheusAddr) == 0 { go func() { - if e := http.ListenAndServe(statusAddr, nil); e != nil { - log.Warn("fail to start pprof", zap.String("addr", statusAddr), zap.Error(e)) - } else { - log.Info("start pprof", zap.String("addr", statusAddr)) - } + utils.PushPrometheus("br", prometheusAddr, time.Second*10) }() } @@ -128,3 +140,22 @@ func GetDefaultRawClient() *raw.BackupClient { func GetDefaultContext() context.Context { return defaultContext } + +func runServe(addr string) { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + + go func() { + if err := http.ListenAndServe(addr, mux); err != nil { + log.Error("start server failed") + } else { + log.Info("start server") + } + }() +} diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 9f1020a40..499f669b3 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -193,6 +193,7 @@ func (rc *Client) CreateTable(table *utils.Table) (*restore_util.RewriteRules, e // RestoreTable tries to restore the data of a table. func (rc *Client) RestoreTable(table *utils.Table, rewriteRules *restore_util.RewriteRules, restoreTS uint64) error { + label := fmt.Sprintf("%s.%s", table.Db.Name, table.Schema.Name) log.Info("start to restore table", zap.Stringer("table", table.Schema.Name), zap.Stringer("db", table.Db.Name), @@ -213,7 +214,13 @@ func (rc *Client) RestoreTable(table *utils.Table, rewriteRules *restore_util.Re select { case <-rc.ctx.Done(): errCh <- nil - case errCh <- rc.fileImporter.Import(fileReplica, encodedRules): + default: + startTime := time.Now() + err := rc.fileImporter.Import(fileReplica, encodedRules) + if err == nil { + restoreFileHistogram.Observe(time.Since(startTime).Seconds()) + } + errCh <- err } }) } @@ -229,6 +236,7 @@ func (rc *Client) RestoreTable(table *utils.Table, rewriteRules *restore_util.Re ) return err } + restoreFileCounter.WithLabelValues(label).Add(1) } log.Info( "finish to restore table", diff --git a/pkg/restore/metrics.go b/pkg/restore/metrics.go new file mode 100644 index 000000000..a6e5be45f --- /dev/null +++ b/pkg/restore/metrics.go @@ -0,0 +1,29 @@ +package restore + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + restoreFileCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "br", + Subsystem: "restore", + Name: "restore_file", + Help: "Restore file statistic.", + }, []string{"type"}) + + restoreFileHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "br", + Subsystem: "restore", + Name: "region_file_seconds", + Help: "Restore file latency distributions.", + Buckets: prometheus.ExponentialBuckets(0.05, 2, 16), + }) +) + +func init() { + prometheus.MustRegister(restoreFileCounter) + prometheus.MustRegister(restoreFileHistogram) +} diff --git a/pkg/utils/metrics.go b/pkg/utils/metrics.go new file mode 100644 index 000000000..7f8af353c --- /dev/null +++ b/pkg/utils/metrics.go @@ -0,0 +1,20 @@ +package utils + +import ( + "time" + + "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/push" +) + +// PushPrometheus pushes metrics to Prometheus Pushgateway. +func PushPrometheus(job, addr string, interval time.Duration) { + for { + err := push.New(addr, job).Gatherer(prometheus.DefaultGatherer).Push() + if err != nil { + log.Error("could not push metrics to Prometheus Pushgateway") + } + time.Sleep(interval) + } +}