Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package cmd
import (
"context"
"net/http"
"net/http/pprof"
"sync"
"time"

"github.com/pingcap/br/pkg/meta"
"github.com/pingcap/br/pkg/raw"
"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

var (
Expand All @@ -37,6 +40,8 @@ const (
FlagLogFile = "log-file"
// FlagStatusAddr is the name of status-addr flag.
FlagStatusAddr = "status-addr"
// FlagPrometheusAddr is the name of prometheus-addr flag
FlagPrometheusAddr = "prometheus-addr"
)

// AddFlags adds flags to the given cmd.
Expand All @@ -53,6 +58,8 @@ func AddFlags(cmd *cobra.Command) {
"Set the log file path. If not set, logs will output to stdout")
cmd.PersistentFlags().String(FlagStatusAddr, "localhost:6060",
"Set the HTTP listening address for the status report service. Set to empty string to disable")
cmd.PersistentFlags().String(FlagPrometheusAddr, "",
"Set the HTTP address for the prometheus metrics service. Set to empty string to disable")
cmd.MarkFlagRequired(FlagPD)
cmd.MarkFlagRequired(FlagStorage)
}
Expand Down Expand Up @@ -86,12 +93,17 @@ func Init(ctx context.Context, cmd *cobra.Command) (err error) {
return
}
if len(statusAddr) != 0 {
runServe(statusAddr)
}

prometheusAddr, e := cmd.Flags().GetString(FlagPrometheusAddr)
if e != nil {
err = e
return
}
if len(prometheusAddr) == 0 {
go func() {
if e := http.ListenAndServe(statusAddr, nil); e != nil {
log.Warn("fail to start pprof", zap.String("addr", statusAddr), zap.Error(e))
} else {
log.Info("start pprof", zap.String("addr", statusAddr))
}
utils.PushPrometheus("br", prometheusAddr, time.Second*10)
}()
}

Expand Down Expand Up @@ -128,3 +140,22 @@ func GetDefaultRawClient() *raw.BackupClient {
func GetDefaultContext() context.Context {
return defaultContext
}

func runServe(addr string) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())

mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

go func() {
if err := http.ListenAndServe(addr, mux); err != nil {
log.Error("start server failed")
} else {
log.Info("start server")
}
}()
}
10 changes: 9 additions & 1 deletion pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (rc *Client) CreateTable(table *utils.Table) (*restore_util.RewriteRules, e

// RestoreTable tries to restore the data of a table.
func (rc *Client) RestoreTable(table *utils.Table, rewriteRules *restore_util.RewriteRules, restoreTS uint64) error {
label := fmt.Sprintf("%s.%s", table.Db.Name, table.Schema.Name)
log.Info("start to restore table",
zap.Stringer("table", table.Schema.Name),
zap.Stringer("db", table.Db.Name),
Expand All @@ -213,7 +214,13 @@ func (rc *Client) RestoreTable(table *utils.Table, rewriteRules *restore_util.Re
select {
case <-rc.ctx.Done():
errCh <- nil
case errCh <- rc.fileImporter.Import(fileReplica, encodedRules):
default:
startTime := time.Now()
err := rc.fileImporter.Import(fileReplica, encodedRules)
if err == nil {
restoreFileHistogram.Observe(time.Since(startTime).Seconds())
}
errCh <- err
}
})
}
Expand All @@ -229,6 +236,7 @@ func (rc *Client) RestoreTable(table *utils.Table, rewriteRules *restore_util.Re
)
return err
}
restoreFileCounter.WithLabelValues(label).Add(1)
}
log.Info(
"finish to restore table",
Expand Down
29 changes: 29 additions & 0 deletions pkg/restore/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package restore

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
restoreFileCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "br",
Subsystem: "restore",
Name: "restore_file",
Help: "Restore file statistic.",
}, []string{"type"})

restoreFileHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "br",
Subsystem: "restore",
Name: "region_file_seconds",
Help: "Restore file latency distributions.",
Buckets: prometheus.ExponentialBuckets(0.05, 2, 16),
})
)

func init() {
prometheus.MustRegister(restoreFileCounter)
prometheus.MustRegister(restoreFileHistogram)
}
20 changes: 20 additions & 0 deletions pkg/utils/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package utils

import (
"time"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
)

// PushPrometheus pushes metrics to Prometheus Pushgateway.
func PushPrometheus(job, addr string, interval time.Duration) {
for {
err := push.New(addr, job).Gatherer(prometheus.DefaultGatherer).Push()
if err != nil {
log.Error("could not push metrics to Prometheus Pushgateway")
}
time.Sleep(interval)
}
}