Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pkg/ring/ring.pb.go: pkg/ring/ring.proto
pkg/querier/frontend/frontend.pb.go: pkg/querier/frontend/frontend.proto
pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
pkg/storage/rules/rules.pb.go: pkg/storage/rules/rules.proto
pkg/util/usertracker/usertracker.pb.go: pkg/util/usertracker/usertracker.proto
all: $(UPTODATE_FILES)
test: protos
mod-check: protos
Expand Down
2 changes: 2 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Config struct {
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.

ConfigDB db.Config `yaml:"configdb,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
ConfigStore config_client.Config `yaml:"config_store,omitempty"`
Alertmanager alertmanager.MultitenantAlertmanagerConfig `yaml:"alertmanager,omitempty"`
Expand Down Expand Up @@ -100,6 +101,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {

c.Ruler.RegisterFlags(f)
c.ConfigStore.RegisterFlags(f)
c.ConfigDB.RegisterFlags(f)
c.Alertmanager.RegisterFlags(f)

// These don't seem to have a home.
Expand Down
21 changes: 3 additions & 18 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/configs/api"
config_client "github.com/cortexproject/cortex/pkg/configs/client"
"github.com/cortexproject/cortex/pkg/configs/db"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/ingester"
Expand Down Expand Up @@ -320,26 +319,12 @@ func (t *Cortex) initRuler(cfg *Config) (err error) {
cfg.Ruler.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort
queryable, engine := querier.New(cfg.Querier, t.distributor, t.store)

rulesAPI, err := config_client.New(cfg.ConfigStore)
if err != nil {
return err
}

t.ruler, err = ruler.NewRuler(cfg.Ruler, engine, queryable, t.distributor, rulesAPI)
t.ruler, err = ruler.NewRuler(cfg.Ruler, engine, queryable, t.distributor)
if err != nil {
return
}

// Only serve the API for setting & getting rules configs if we're not
// serving configs from the configs API. Allows for smoother
// migration. See https://github.com/cortexproject/cortex/issues/619
if cfg.ConfigStore.ConfigsAPIURL.URL == nil {
a, err := ruler.NewAPIFromConfig(cfg.ConfigStore.DBConfig)
if err != nil {
return err
}
a.RegisterRoutes(t.server.HTTP)
}
t.ruler.RegisterRoutes(t.server.HTTP)

t.server.HTTP.Handle("/ruler_ring", t.ruler)
return
Expand All @@ -351,7 +336,7 @@ func (t *Cortex) stopRuler() error {
}

func (t *Cortex) initConfigs(cfg *Config) (err error) {
t.configDB, err = db.New(cfg.ConfigStore.DBConfig)
t.configDB, err = db.New(cfg.ConfigDB)
if err != nil {
return
}
Expand Down
259 changes: 196 additions & 63 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
@@ -1,118 +1,251 @@
package ruler

import (
"database/sql"
"encoding/json"
"fmt"
"errors"
"io/ioutil"
"net/http"

"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/prometheus/prometheus/pkg/rulefmt"

"github.com/cortexproject/cortex/pkg/configs"
"github.com/cortexproject/cortex/pkg/configs/db"
store "github.com/cortexproject/cortex/pkg/storage/rules"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/weaveworks/common/user"
"gopkg.in/yaml.v2"
)

// API implements the configs api.
type API struct {
db db.DB
http.Handler
}

// NewAPIFromConfig makes a new API from our database config.
func NewAPIFromConfig(cfg db.Config) (*API, error) {
db, err := db.New(cfg)
if err != nil {
return nil, err
}
return NewAPI(db), nil
}

// NewAPI creates a new API.
func NewAPI(db db.DB) *API {
a := &API{db: db}
r := mux.NewRouter()
a.RegisterRoutes(r)
a.Handler = r
return a
}
var (
// ErrNoNamespace signals the requested namespace does not exist
ErrNoNamespace = errors.New("a namespace must be provided in the url")
// ErrNoGroupName signals a group name url parameter was not found
ErrNoGroupName = errors.New("a matching group name must be provided in the url")
// ErrNoRuleGroups signals the rule group requested does not exist
ErrNoRuleGroups = errors.New("no rule groups found")
// ErrNoUserID is returned when no user ID is provided
ErrNoUserID = errors.New("no id provided")
)

// RegisterRoutes registers the configs API HTTP routes with the provided Router.
func (a *API) RegisterRoutes(r *mux.Router) {
func (r *Ruler) RegisterRoutes(router *mux.Router) {
// If no store is set do not register routes in the api. This will only be the case if the configdb
// is used to store rules
if r.store == nil {
return
}
for _, route := range []struct {
name, method, path string
handler http.HandlerFunc
}{
{"get_rules", "GET", "/api/prom/rules", a.getConfig},
{"cas_rules", "POST", "/api/prom/rules", a.casConfig},
{"list_rules", "GET", "/api/prom/rules", r.listRules},
{"getRuleNamespace", "GET", "/api/prom/rules/{namespace}", r.listRules},
{"get_rulegroup", "GET", "/api/prom/rules/{namespace}/{groupName}", r.getRuleGroup},
{"set_rulegroup", "POST", "/api/prom/rules/{namespace}", r.createRuleGroup},
{"delete_rulegroup", "DELETE", "/api/prom/rules/{namespace}/{groupName}", r.deleteRuleGroup},
} {
r.Handle(route.path, route.handler).Methods(route.method).Name(route.name)
router.Handle(route.path, route.handler).Methods(route.method).Name(route.name)
}
}

// getConfig returns the request configuration.
func (a *API) getConfig(w http.ResponseWriter, r *http.Request) {
userID, _, err := user.ExtractOrgIDFromHTTPRequest(r)
func (r *Ruler) listRules(w http.ResponseWriter, req *http.Request) {
logger := util.WithContext(req.Context(), util.Logger)
userID, _, err := user.ExtractOrgIDFromHTTPRequest(req)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
logger := util.WithContext(r.Context(), util.Logger)

cfg, err := a.db.GetRulesConfig(r.Context(), userID)
if err == sql.ErrNoRows {
http.Error(w, "No configuration", http.StatusNotFound)
if userID == "" {
http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized)
return
}

options := store.RuleStoreConditions{
UserID: userID,
}

vars := mux.Vars(req)

namespace := vars["namespace"]
if namespace != "" {
level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace)
options.Namespace = namespace
}

level.Debug(logger).Log("msg", "retrieving rule groups from rule store", "userID", userID)
rgs, err := r.store.ListRuleGroups(req.Context(), options)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

level.Debug(logger).Log("msg", "retrieved rule groups from rule store", "userID", userID, "num_namespaces", len(rgs))

if len(rgs) == 0 {
level.Info(logger).Log("msg", "no rule groups found", "userID", userID)
http.Error(w, ErrNoRuleGroups.Error(), http.StatusNotFound)
return
} else if err != nil {
level.Error(logger).Log("msg", "error getting config", "err", err)
}

formatted := rgs.Formatted(userID)

d, err := yaml.Marshal(&formatted)
if err != nil {
level.Error(logger).Log("msg", "error marshalling yaml rule groups", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(cfg); err != nil {
level.Error(logger).Log("msg", "error encoding config", "err", err)
w.Header().Set("Content-Type", "application/yaml")
if _, err := w.Write(d); err != nil {
level.Error(logger).Log("msg", "error writing yaml response", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

type configUpdateRequest struct {
OldConfig configs.RulesConfig `json:"old_config"`
NewConfig configs.RulesConfig `json:"new_config"`
func (r *Ruler) getRuleGroup(w http.ResponseWriter, req *http.Request) {
logger := util.WithContext(req.Context(), util.Logger)

userID, _, err := user.ExtractOrgIDFromHTTPRequest(req)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

if userID == "" {
http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized)
return
}

vars := mux.Vars(req)
namespace, exists := vars["namespace"]
if !exists {
http.Error(w, ErrNoNamespace.Error(), http.StatusUnauthorized)
return
}

groupName, exists := vars["groupName"]
if !exists {
http.Error(w, ErrNoGroupName.Error(), http.StatusUnauthorized)
return
}

rg, err := r.store.GetRuleGroup(req.Context(), userID, namespace, groupName)
if err != nil {
if err == store.ErrGroupNotFound {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

d, err := yaml.Marshal(&rg)
if err != nil {
level.Error(logger).Log("msg", "error marshalling yaml rule groups", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/yaml")
if _, err := w.Write(d); err != nil {
level.Error(logger).Log("msg", "error writing yaml response", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

func (a *API) casConfig(w http.ResponseWriter, r *http.Request) {
userID, _, err := user.ExtractOrgIDFromHTTPRequest(r)
func (r *Ruler) createRuleGroup(w http.ResponseWriter, req *http.Request) {
logger := util.WithContext(req.Context(), util.Logger)
userID, _, err := user.ExtractOrgIDFromHTTPRequest(req)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
logger := util.WithContext(r.Context(), util.Logger)

var updateReq configUpdateRequest
if err := json.NewDecoder(r.Body).Decode(&updateReq); err != nil {
level.Error(logger).Log("msg", "error decoding json body", "err", err)
if userID == "" {
http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized)
return
}

vars := mux.Vars(req)

namespace := vars["namespace"]
if namespace == "" {
level.Error(logger).Log("err", "no namespace provided with rule group")
http.Error(w, ErrNoNamespace.Error(), http.StatusBadRequest)
return
}

payload, err := ioutil.ReadAll(req.Body)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if _, err = updateReq.NewConfig.Parse(); err != nil {
level.Error(logger).Log("msg", "invalid rules", "err", err)
http.Error(w, fmt.Sprintf("Invalid rules: %v", err), http.StatusBadRequest)
level.Debug(logger).Log("msg", "attempting to unmarshal rulegroup", "userID", userID, "group", string(payload))

rg := rulefmt.RuleGroup{}
err = yaml.Unmarshal(payload, &rg)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

updated, err := a.db.SetRulesConfig(r.Context(), userID, updateReq.OldConfig, updateReq.NewConfig)
errs := store.ValidateRuleGroup(rg)
if len(errs) > 0 {
level.Error(logger).Log("err", err.Error())
http.Error(w, errs[0].Error(), http.StatusBadRequest)
return
}

err = r.store.SetRuleGroup(req.Context(), userID, namespace, rg)
if err != nil {
level.Error(logger).Log("msg", "error storing config", "err", err)
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !updated {
http.Error(w, "Supplied configuration doesn't match current configuration", http.StatusConflict)

// Return a status accepted because the rule has been stored and queued for polling, but is not currently active
w.WriteHeader(http.StatusAccepted)
}

func (r *Ruler) deleteRuleGroup(w http.ResponseWriter, req *http.Request) {
logger := util.WithContext(req.Context(), util.Logger)
userID, _, err := user.ExtractOrgIDFromHTTPRequest(req)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

if userID == "" {
http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized)
return
}
w.WriteHeader(http.StatusNoContent)

vars := mux.Vars(req)
namespace, exists := vars["namespace"]
if !exists {
http.Error(w, ErrNoNamespace.Error(), http.StatusUnauthorized)
return
}

groupName, exists := vars["groupName"]
if !exists {
http.Error(w, ErrNoGroupName.Error(), http.StatusUnauthorized)
return
}

err = r.store.DeleteRuleGroup(req.Context(), userID, namespace, groupName)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// Return a status accepted because the rule has been stored and queued for polling, but is not currently active
w.WriteHeader(http.StatusAccepted)
}
Loading