From beca6a4fbf3c1862f2b4848bca49e671a456713d Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Wed, 18 Sep 2019 14:51:26 -0400 Subject: [PATCH 1/3] refactor alertmanager storage and add static file alert store and add integration test for alertmananger Signed-off-by: Jacob Lisi --- CHANGELOG.md | 2 + docs/configuration/config-file-reference.md | 21 +- integration/alertmanager_test.go | 47 ++ integration/backward_compatibility_test.go | 4 +- integration/configs.go | 13 + integration/e2ecortex/client.go | 51 +- integration/e2ecortex/services.go | 18 + integration/ingester_flush_test.go | 2 +- integration/ingester_hand_over_test.go | 2 +- pkg/alertmanager/alertmanager.go | 11 +- pkg/alertmanager/alerts/alerts.pb.go | 801 ++++++++++++++++++++ pkg/alertmanager/alerts/alerts.proto | 20 + pkg/alertmanager/alerts/configdb/store.go | 60 ++ pkg/alertmanager/alerts/local/store.go | 78 ++ pkg/alertmanager/multitenant.go | 170 ++--- pkg/alertmanager/multitenant_test.go | 121 +++ pkg/alertmanager/storage.go | 47 ++ pkg/cortex/cortex.go | 3 - pkg/cortex/modules.go | 2 +- 19 files changed, 1368 insertions(+), 105 deletions(-) create mode 100644 integration/alertmanager_test.go create mode 100644 pkg/alertmanager/alerts/alerts.pb.go create mode 100644 pkg/alertmanager/alerts/alerts.proto create mode 100644 pkg/alertmanager/alerts/configdb/store.go create mode 100644 pkg/alertmanager/alerts/local/store.go create mode 100644 pkg/alertmanager/multitenant_test.go create mode 100644 pkg/alertmanager/storage.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a20f735f38..13b644078f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased +* [CHANGE] Config file changed to remove top level `config_store` field in favor of a nested `configdb` field. #2125 * [CHANGE] Removed unnecessary `frontend.cache-split-interval` in favor of `querier.split-queries-by-interval` both to reduce configuration complexity and guarantee alignment of these two configs. Starting from now, `-querier.cache-results` may only be enabled in conjunction with `-querier.split-queries-by-interval` (previously the cache interval default was `24h` so if you want to preserve the same behaviour you should set `-querier.split-queries-by-interval=24h`). #2040 * [CHANGE] Removed remaining support for using denormalised tokens in the ring. If you're still running ingesters with denormalised tokens (Cortex 0.4 or earlier, with `-ingester.normalise-tokens=false`), such ingesters will now be completely invisible to distributors and need to be either switched to Cortex 0.6.0 or later, or be configured to use normalised tokens. #2034 * [CHANGE] Moved `--store.min-chunk-age` to the Querier config as `--querier.query-store-after`, allowing the store to be skipped during query time if the metrics wouldn't be found. The YAML config option `ingestermaxquerylookback` has been renamed to `query_ingesters_within` to match its CLI flag. #1893 @@ -13,6 +14,7 @@ * `--experimental.distributor.user-subring-size` * [FEATURE] Added flag `-experimental.ruler.enable-api` to enable the ruler api which implements the Prometheus API `/api/v1/rules` and `/api/v1/alerts` endpoints under the configured `-http.prefix`. #1999 * [FEATURE] Added sharding support to compactor when using the experimental TSDB blocks storage. #2113 +* [ENHANCEMENT] Add `status` label to `cortex_alertmanager_configs` metric to guage the number of valid and invalid configs. #2125 * [ENHANCEMENT] Cassandra Authentication: added the `custom_authenticators` config option that allows users to authenticate with cassandra clusters using password authenticators that are not approved by default in [gocql](https://github.com/gocql/gocql/blob/81b8263d9fe526782a588ef94d3fa5c6148e5d67/conn.go#L27) #2093 * [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023 * [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 351d3b73543..db97324b6dd 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -86,11 +86,6 @@ Supported contents and default values of the config file: # and used by the 'configs' service to expose APIs to manage them. [configdb: ] -# The configstore_config configures the config database storing rules and -# alerts, and is used by the Cortex alertmanager. -# The CLI flags prefix for this block config is: alertmanager -[config_store: ] - # The alertmanager_config configures the Cortex alertmanager. [alertmanager: ] @@ -821,6 +816,22 @@ externalurl: # Root of URL to generate if config is http://internal.monitor # CLI flag: -alertmanager.configs.auto-webhook-root [autowebhookroot: | default = ""] + +store: + # Type of backend to use to store alertmanager configs. Supported values are: + # "configdb", "local". + # CLI flag: -alertmanager.storage.type + [type: | default = "configdb"] + + # The configstore_config configures the config database storing rules and + # alerts, and is used by the Cortex alertmanager. + # The CLI flags prefix for this block config is: alertmanager + [configdb: ] + + local: + # Path at which alertmanager configurations are stored. + # CLI flag: -alertmanager.storage.local.path + [path: | default = ""] ``` ## `table_manager_config` diff --git a/integration/alertmanager_test.go b/integration/alertmanager_test.go new file mode 100644 index 00000000000..29331e9e293 --- /dev/null +++ b/integration/alertmanager_test.go @@ -0,0 +1,47 @@ +package main + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func TestAlertmanager(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + alertmanagerDir := filepath.Join(s.SharedDir(), "alertmanager_configs") + require.NoError(t, os.Mkdir(alertmanagerDir, os.ModePerm)) + + require.NoError(t, ioutil.WriteFile( + filepath.Join(alertmanagerDir, "user-1.yaml"), + []byte(cortexAlertmanagerUserConfigYaml), + os.ModePerm), + ) + + alertmanager := e2ecortex.NewAlertmanager("alertmanager", AlertmanagerConfigs, "") + require.NoError(t, s.StartAndWaitReady(alertmanager)) + require.NoError(t, alertmanager.WaitSumMetric("cortex_alertmanager_configs", 1)) + + c, err := e2ecortex.NewClient("", "", alertmanager.Endpoint(80), "user-1") + require.NoError(t, err) + + cfg, err := c.GetAlertmanagerConfig(context.Background()) + require.NoError(t, err) + + // Ensure the returned status config matches alertmanager_test_fixtures/user-1.yaml + require.NotNil(t, cfg) + require.Equal(t, "example_receiver", cfg.Route.Receiver) + require.Len(t, cfg.Route.GroupByStr, 1) + require.Equal(t, "example_groupby", cfg.Route.GroupByStr[0]) + require.Len(t, cfg.Receivers, 1) + require.Equal(t, "example_receiver", cfg.Receivers[0].Name) +} diff --git a/integration/backward_compatibility_test.go b/integration/backward_compatibility_test.go index 32b6d67b21e..7b5dea85a1d 100644 --- a/integration/backward_compatibility_test.go +++ b/integration/backward_compatibility_test.go @@ -48,7 +48,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) { now := time.Now() series, expectedVector := generateSeries("series_1", now) - c, err := e2ecortex.NewClient(distributor.Endpoint(80), "", "user-1") + c, err := e2ecortex.NewClient(distributor.Endpoint(80), "", "", "user-1") require.NoError(t, err) res, err := c.Push(series) @@ -74,7 +74,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) { require.NoError(t, querier.WaitSumMetric("cortex_ring_tokens_total", 512)) // Query the series - c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "user-1") + c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "", "user-1") require.NoError(t, err) result, err := c.Query("series_1", now) diff --git a/integration/configs.go b/integration/configs.go index f7aede0af31..8514fbd2391 100644 --- a/integration/configs.go +++ b/integration/configs.go @@ -22,9 +22,22 @@ const ( prefix: cortex_chunks_ period: 168h0m0s ` + + cortexAlertmanagerUserConfigYaml = `route: + receiver: "example_receiver" + group_by: ["example_groupby"] +receivers: + - name: "example_receiver" +` ) var ( + AlertmanagerConfigs = map[string]string{ + "-alertmanager.storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"), + "-alertmanager.storage.type": "local", + "-alertmanager.web.external-url": "http://localhost/api/prom", + } + BlocksStorage = map[string]string{ "-store.engine": "tsdb", "-experimental.tsdb.backend": "s3", diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 39293649641..d39be4a5966 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -3,20 +3,24 @@ package e2ecortex import ( "bytes" "context" + "encoding/json" "fmt" "net/http" "time" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + alertConfig "github.com/prometheus/alertmanager/config" promapi "github.com/prometheus/client_golang/api" promv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" + yaml "gopkg.in/yaml.v2" ) // Client is a client used to interact with Cortex in integration tests type Client struct { + alertmanagerClient promapi.Client distributorAddress string timeout time.Duration httpClient *http.Client @@ -25,7 +29,7 @@ type Client struct { } // NewClient makes a new Cortex client -func NewClient(distributorAddress string, querierAddress string, orgID string) (*Client, error) { +func NewClient(distributorAddress string, querierAddress string, alertmanagerAddress string, orgID string) (*Client, error) { // Create querier API client querierAPIClient, err := promapi.NewClient(promapi.Config{ Address: "http://" + querierAddress + "/api/prom", @@ -43,6 +47,17 @@ func NewClient(distributorAddress string, querierAddress string, orgID string) ( orgID: orgID, } + if alertmanagerAddress != "" { + alertmanagerAPIClient, err := promapi.NewClient(promapi.Config{ + Address: "http://" + alertmanagerAddress + "/api/prom", + RoundTripper: &addOrgIDRoundTripper{orgID: orgID, next: http.DefaultTransport}, + }) + if err != nil { + return nil, err + } + c.alertmanagerClient = alertmanagerAPIClient + } + return c, nil } @@ -95,3 +110,37 @@ func (r *addOrgIDRoundTripper) RoundTrip(req *http.Request) (*http.Response, err return r.next.RoundTrip(req) } + +// ServerStatus represents a Alertmanager status response +// TODO: Upgrade to Alertmanager v0.20.0+ and utilize vendored structs +type ServerStatus struct { + Data struct { + ConfigYaml string `json:"configYAML"` + } `json:"data"` +} + +// GetAlertmanagerConfig gets the status of an alertmanager instance +func (c *Client) GetAlertmanagerConfig(ctx context.Context) (*alertConfig.Config, error) { + u := c.alertmanagerClient.URL("/api/v1/status", nil) + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %v", err) + } + + _, body, _, err := c.alertmanagerClient.Do(ctx, req) // Ignoring warnings. + if err != nil { + return nil, err + } + + var ss *ServerStatus + err = json.Unmarshal(body, &ss) + if err != nil { + return nil, err + } + + cfg := &alertConfig.Config{} + err = yaml.Unmarshal([]byte(ss.Data.ConfigYaml), cfg) + + return cfg, err +} diff --git a/integration/e2ecortex/services.go b/integration/e2ecortex/services.go index c19597bbda3..5ea611e1127 100644 --- a/integration/e2ecortex/services.go +++ b/integration/e2ecortex/services.go @@ -120,3 +120,21 @@ func NewSingleBinary(name string, flags map[string]string, image string, httpPor otherPorts..., ) } + +func NewAlertmanager(name string, flags map[string]string, image string) *e2e.HTTPService { + if image == "" { + image = GetDefaultImage() + } + + return e2e.NewHTTPService( + name, + image, + e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{ + "-target": "alertmanager", + "-log.level": "warn", + }, flags))...), + // The alertmanager doesn't expose a readiness probe, so we just check if the / returns 404 + e2e.NewReadinessProbe(80, "/", 404), + 80, + ) +} diff --git a/integration/ingester_flush_test.go b/integration/ingester_flush_test.go index 5f8eac4a57d..62f0d93848e 100644 --- a/integration/ingester_flush_test.go +++ b/integration/ingester_flush_test.go @@ -46,7 +46,7 @@ func TestIngesterFlushWithChunksStorage(t *testing.T) { require.NoError(t, distributor.WaitSumMetric("cortex_ring_tokens_total", 512)) require.NoError(t, querier.WaitSumMetric("cortex_ring_tokens_total", 512)) - c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "user-1") + c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "", "user-1") require.NoError(t, err) // Push some series to Cortex. diff --git a/integration/ingester_hand_over_test.go b/integration/ingester_hand_over_test.go index 085fb8dcabd..4d0edfd2c8d 100644 --- a/integration/ingester_hand_over_test.go +++ b/integration/ingester_hand_over_test.go @@ -56,7 +56,7 @@ func runIngesterHandOverTest(t *testing.T, flags map[string]string, setup func(t require.NoError(t, distributor.WaitSumMetric("cortex_ring_tokens_total", 512)) require.NoError(t, querier.WaitSumMetric("cortex_ring_tokens_total", 512)) - c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "user-1") + c, err := e2ecortex.NewClient(distributor.Endpoint(80), querier.Endpoint(80), "", "user-1") require.NoError(t, err) // Push some series to Cortex. diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index 6371de0b3bb..943f0839a32 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -191,8 +191,15 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config) error { am.api.Update(conf, func(_ model.LabelSet) {}) - am.inhibitor.Stop() - am.dispatcher.Stop() + // Ensure inhibitor is set before being called + if am.inhibitor != nil { + am.inhibitor.Stop() + } + + // Ensure dispatcher is set before being called + if am.dispatcher != nil { + am.dispatcher.Stop() + } am.inhibitor = inhibit.NewInhibitor(am.alerts, conf.InhibitRules, am.marker, log.With(am.logger, "component", "inhibitor")) diff --git a/pkg/alertmanager/alerts/alerts.pb.go b/pkg/alertmanager/alerts/alerts.pb.go new file mode 100644 index 00000000000..016c42a4162 --- /dev/null +++ b/pkg/alertmanager/alerts/alerts.pb.go @@ -0,0 +1,801 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: alerts.proto + +package alerts + +import ( + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + reflect "reflect" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type AlertConfigDesc struct { + User string `protobuf:"bytes,1,opt,name=user,proto3" json:"user,omitempty"` + RawConfig string `protobuf:"bytes,2,opt,name=raw_config,json=rawConfig,proto3" json:"raw_config,omitempty"` + Templates []*TemplateDesc `protobuf:"bytes,3,rep,name=templates,proto3" json:"templates,omitempty"` +} + +func (m *AlertConfigDesc) Reset() { *m = AlertConfigDesc{} } +func (*AlertConfigDesc) ProtoMessage() {} +func (*AlertConfigDesc) Descriptor() ([]byte, []int) { + return fileDescriptor_20493709c38b81dc, []int{0} +} +func (m *AlertConfigDesc) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *AlertConfigDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_AlertConfigDesc.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *AlertConfigDesc) XXX_Merge(src proto.Message) { + xxx_messageInfo_AlertConfigDesc.Merge(m, src) +} +func (m *AlertConfigDesc) XXX_Size() int { + return m.Size() +} +func (m *AlertConfigDesc) XXX_DiscardUnknown() { + xxx_messageInfo_AlertConfigDesc.DiscardUnknown(m) +} + +var xxx_messageInfo_AlertConfigDesc proto.InternalMessageInfo + +func (m *AlertConfigDesc) GetUser() string { + if m != nil { + return m.User + } + return "" +} + +func (m *AlertConfigDesc) GetRawConfig() string { + if m != nil { + return m.RawConfig + } + return "" +} + +func (m *AlertConfigDesc) GetTemplates() []*TemplateDesc { + if m != nil { + return m.Templates + } + return nil +} + +type TemplateDesc struct { + Filename string `protobuf:"bytes,1,opt,name=filename,proto3" json:"filename,omitempty"` + Body string `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` +} + +func (m *TemplateDesc) Reset() { *m = TemplateDesc{} } +func (*TemplateDesc) ProtoMessage() {} +func (*TemplateDesc) Descriptor() ([]byte, []int) { + return fileDescriptor_20493709c38b81dc, []int{1} +} +func (m *TemplateDesc) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TemplateDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TemplateDesc.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TemplateDesc) XXX_Merge(src proto.Message) { + xxx_messageInfo_TemplateDesc.Merge(m, src) +} +func (m *TemplateDesc) XXX_Size() int { + return m.Size() +} +func (m *TemplateDesc) XXX_DiscardUnknown() { + xxx_messageInfo_TemplateDesc.DiscardUnknown(m) +} + +var xxx_messageInfo_TemplateDesc proto.InternalMessageInfo + +func (m *TemplateDesc) GetFilename() string { + if m != nil { + return m.Filename + } + return "" +} + +func (m *TemplateDesc) GetBody() string { + if m != nil { + return m.Body + } + return "" +} + +func init() { + proto.RegisterType((*AlertConfigDesc)(nil), "alerts.AlertConfigDesc") + proto.RegisterType((*TemplateDesc)(nil), "alerts.TemplateDesc") +} + +func init() { proto.RegisterFile("alerts.proto", fileDescriptor_20493709c38b81dc) } + +var fileDescriptor_20493709c38b81dc = []byte{ + // 253 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0xcc, 0x49, 0x2d, + 0x2a, 0x29, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x83, 0xf0, 0xa4, 0x74, 0xd3, 0x33, + 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, 0xf3, 0xd3, 0xf3, 0xf5, 0xc1, 0xd2, + 0x49, 0xa5, 0x69, 0x60, 0x1e, 0x98, 0x03, 0x66, 0x41, 0xb4, 0x29, 0x55, 0x70, 0xf1, 0x3b, 0x82, + 0x34, 0x3a, 0xe7, 0xe7, 0xa5, 0x65, 0xa6, 0xbb, 0xa4, 0x16, 0x27, 0x0b, 0x09, 0x71, 0xb1, 0x94, + 0x16, 0xa7, 0x16, 0x49, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x81, 0xd9, 0x42, 0xb2, 0x5c, 0x5c, + 0x45, 0x89, 0xe5, 0xf1, 0xc9, 0x60, 0x55, 0x12, 0x4c, 0x60, 0x19, 0xce, 0xa2, 0xc4, 0x72, 0x88, + 0x36, 0x21, 0x23, 0x2e, 0xce, 0x92, 0xd4, 0xdc, 0x82, 0x9c, 0xc4, 0x92, 0xd4, 0x62, 0x09, 0x66, + 0x05, 0x66, 0x0d, 0x6e, 0x23, 0x11, 0x3d, 0xa8, 0xf3, 0x42, 0xa0, 0x12, 0x20, 0xb3, 0x83, 0x10, + 0xca, 0x94, 0xec, 0xb8, 0x78, 0x90, 0xa5, 0x84, 0xa4, 0xb8, 0x38, 0xd2, 0x32, 0x73, 0x52, 0xf3, + 0x12, 0x73, 0x53, 0xa1, 0x56, 0xc3, 0xf9, 0x20, 0x27, 0x25, 0xe5, 0xa7, 0x54, 0x42, 0x2d, 0x06, + 0xb3, 0x9d, 0x4c, 0x2e, 0x3c, 0x94, 0x63, 0xb8, 0xf1, 0x50, 0x8e, 0xe1, 0xc3, 0x43, 0x39, 0xc6, + 0x86, 0x47, 0x72, 0x8c, 0x2b, 0x1e, 0xc9, 0x31, 0x9e, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, + 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x2f, 0x1e, 0xc9, 0x31, 0x7c, 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, + 0x1c, 0xc3, 0x85, 0xc7, 0x72, 0x0c, 0x37, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0xbd, 0x6d, 0x0c, + 0x08, 0x00, 0x00, 0xff, 0xff, 0xbd, 0x90, 0x9e, 0xed, 0x3d, 0x01, 0x00, 0x00, +} + +func (this *AlertConfigDesc) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*AlertConfigDesc) + if !ok { + that2, ok := that.(AlertConfigDesc) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.User != that1.User { + return false + } + if this.RawConfig != that1.RawConfig { + return false + } + if len(this.Templates) != len(that1.Templates) { + return false + } + for i := range this.Templates { + if !this.Templates[i].Equal(that1.Templates[i]) { + return false + } + } + return true +} +func (this *TemplateDesc) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*TemplateDesc) + if !ok { + that2, ok := that.(TemplateDesc) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Filename != that1.Filename { + return false + } + if this.Body != that1.Body { + return false + } + return true +} +func (this *AlertConfigDesc) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&alerts.AlertConfigDesc{") + s = append(s, "User: "+fmt.Sprintf("%#v", this.User)+",\n") + s = append(s, "RawConfig: "+fmt.Sprintf("%#v", this.RawConfig)+",\n") + if this.Templates != nil { + s = append(s, "Templates: "+fmt.Sprintf("%#v", this.Templates)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *TemplateDesc) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&alerts.TemplateDesc{") + s = append(s, "Filename: "+fmt.Sprintf("%#v", this.Filename)+",\n") + s = append(s, "Body: "+fmt.Sprintf("%#v", this.Body)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringAlerts(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *AlertConfigDesc) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AlertConfigDesc) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.User) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintAlerts(dAtA, i, uint64(len(m.User))) + i += copy(dAtA[i:], m.User) + } + if len(m.RawConfig) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintAlerts(dAtA, i, uint64(len(m.RawConfig))) + i += copy(dAtA[i:], m.RawConfig) + } + if len(m.Templates) > 0 { + for _, msg := range m.Templates { + dAtA[i] = 0x1a + i++ + i = encodeVarintAlerts(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *TemplateDesc) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TemplateDesc) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Filename) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintAlerts(dAtA, i, uint64(len(m.Filename))) + i += copy(dAtA[i:], m.Filename) + } + if len(m.Body) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintAlerts(dAtA, i, uint64(len(m.Body))) + i += copy(dAtA[i:], m.Body) + } + return i, nil +} + +func encodeVarintAlerts(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *AlertConfigDesc) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.User) + if l > 0 { + n += 1 + l + sovAlerts(uint64(l)) + } + l = len(m.RawConfig) + if l > 0 { + n += 1 + l + sovAlerts(uint64(l)) + } + if len(m.Templates) > 0 { + for _, e := range m.Templates { + l = e.Size() + n += 1 + l + sovAlerts(uint64(l)) + } + } + return n +} + +func (m *TemplateDesc) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Filename) + if l > 0 { + n += 1 + l + sovAlerts(uint64(l)) + } + l = len(m.Body) + if l > 0 { + n += 1 + l + sovAlerts(uint64(l)) + } + return n +} + +func sovAlerts(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozAlerts(x uint64) (n int) { + return sovAlerts(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *AlertConfigDesc) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&AlertConfigDesc{`, + `User:` + fmt.Sprintf("%v", this.User) + `,`, + `RawConfig:` + fmt.Sprintf("%v", this.RawConfig) + `,`, + `Templates:` + strings.Replace(fmt.Sprintf("%v", this.Templates), "TemplateDesc", "TemplateDesc", 1) + `,`, + `}`, + }, "") + return s +} +func (this *TemplateDesc) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&TemplateDesc{`, + `Filename:` + fmt.Sprintf("%v", this.Filename) + `,`, + `Body:` + fmt.Sprintf("%v", this.Body) + `,`, + `}`, + }, "") + return s +} +func valueToStringAlerts(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *AlertConfigDesc) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlerts + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AlertConfigDesc: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AlertConfigDesc: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field User", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlerts + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthAlerts + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthAlerts + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.User = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RawConfig", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlerts + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthAlerts + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthAlerts + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.RawConfig = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Templates", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlerts + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthAlerts + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthAlerts + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Templates = append(m.Templates, &TemplateDesc{}) + if err := m.Templates[len(m.Templates)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipAlerts(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthAlerts + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthAlerts + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TemplateDesc) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlerts + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TemplateDesc: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TemplateDesc: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Filename", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlerts + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthAlerts + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthAlerts + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Filename = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Body", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlerts + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthAlerts + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthAlerts + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Body = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipAlerts(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthAlerts + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthAlerts + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipAlerts(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAlerts + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAlerts + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAlerts + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthAlerts + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthAlerts + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAlerts + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipAlerts(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthAlerts + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthAlerts = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowAlerts = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/alertmanager/alerts/alerts.proto b/pkg/alertmanager/alerts/alerts.proto new file mode 100644 index 00000000000..8626ec981c5 --- /dev/null +++ b/pkg/alertmanager/alerts/alerts.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package alerts; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +message AlertConfigDesc { + string user = 1; + string raw_config = 2; + + repeated TemplateDesc templates = 3; +} + +message TemplateDesc { + string filename = 1; + string body = 2; +} \ No newline at end of file diff --git a/pkg/alertmanager/alerts/configdb/store.go b/pkg/alertmanager/alerts/configdb/store.go new file mode 100644 index 00000000000..786ca29a461 --- /dev/null +++ b/pkg/alertmanager/alerts/configdb/store.go @@ -0,0 +1,60 @@ +package configdb + +import ( + "context" + + "github.com/cortexproject/cortex/pkg/alertmanager/alerts" + "github.com/cortexproject/cortex/pkg/configs" + "github.com/cortexproject/cortex/pkg/configs/client" +) + +// Store is a concrete implementation of RuleStore that sources rules from the config service +type Store struct { + configClient client.Client + since configs.ID + alertConfigs map[string]alerts.AlertConfigDesc +} + +// NewStore constructs a Store +func NewStore(c client.Client) *Store { + return &Store{ + configClient: c, + since: 0, + alertConfigs: make(map[string]alerts.AlertConfigDesc), + } +} + +// ListAlertConfigs implements RuleStore +func (c *Store) ListAlertConfigs(ctx context.Context) (map[string]alerts.AlertConfigDesc, error) { + + configs, err := c.configClient.GetAlerts(ctx, c.since) + + if err != nil { + return nil, err + } + + for user, cfg := range configs.Configs { + if cfg.IsDeleted() { + delete(c.alertConfigs, user) + continue + } + + var templates []*alerts.TemplateDesc + for fn, template := range cfg.Config.TemplateFiles { + templates = append(templates, &alerts.TemplateDesc{ + Filename: fn, + Body: template, + }) + } + + c.alertConfigs[user] = alerts.AlertConfigDesc{ + User: user, + RawConfig: cfg.Config.AlertmanagerConfig, + Templates: templates, + } + } + + c.since = configs.GetLatestConfigID() + + return c.alertConfigs, nil +} diff --git a/pkg/alertmanager/alerts/local/store.go b/pkg/alertmanager/alerts/local/store.go new file mode 100644 index 00000000000..df8010e6d5c --- /dev/null +++ b/pkg/alertmanager/alerts/local/store.go @@ -0,0 +1,78 @@ +package local + +import ( + "context" + "flag" + "io/ioutil" + "os" + "path/filepath" + "strings" + + "github.com/pkg/errors" + "github.com/prometheus/alertmanager/config" + + "github.com/cortexproject/cortex/pkg/alertmanager/alerts" +) + +// StoreConfig configures a static file alertmanager store +type StoreConfig struct { + Path string `yaml:"path"` +} + +// RegisterFlags registers flags related to the alertmanager file store +func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.Path, "alertmanager.storage.local.path", "", "Path at which alertmanager configurations are stored.") +} + +// Store is used to load user alertmanager configs from a local disk +type Store struct { + cfg StoreConfig +} + +// NewStore returns a new file alert store. +func NewStore(cfg StoreConfig) (*Store, error) { + return &Store{cfg}, nil +} + +// ListAlertConfigs returns a list of each users alertmanager config. +func (f *Store) ListAlertConfigs(ctx context.Context) (map[string]alerts.AlertConfigDesc, error) { + configs := map[string]alerts.AlertConfigDesc{} + err := filepath.Walk(f.cfg.Path, func(path string, info os.FileInfo, err error) error { + if err != nil { + return errors.Wrap(err, "unable to walk file path") + } + + // Ignore files that are directories or not yaml files + ext := filepath.Ext(info.Name()) + if info.IsDir() || (ext != ".yml" && ext != ".yaml") { + return nil + } + + // Ensure the file is a valid Alertmanager Config. + _, err = config.LoadFile(path) + if err != nil { + return errors.Wrap(err, "unable to load file "+path) + } + + // Load the file to be returned by the store. + content, err := ioutil.ReadFile(path) + if err != nil { + return errors.Wrap(err, "unable to read file "+path) + } + + // The file name must correspond to the user tenant ID + user := strings.TrimSuffix(info.Name(), ext) + + configs[user] = alerts.AlertConfigDesc{ + User: user, + RawConfig: string(content), + } + return nil + }) + + if err != nil { + return nil, err + } + + return configs, nil +} diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 69df130f2e9..60b8271a46e 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -21,8 +21,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/user" - "github.com/cortexproject/cortex/pkg/configs" - configs_client "github.com/cortexproject/cortex/pkg/configs/client" + "github.com/cortexproject/cortex/pkg/alertmanager/alerts" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" ) @@ -67,11 +66,11 @@ const ( ) var ( - totalConfigs = prometheus.NewGauge(prometheus.GaugeOpts{ + totalConfigs = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "cortex", Name: "alertmanager_configs", Help: "How many configs the multitenant alertmanager knows about.", - }) + }, []string{"status"}) statusTemplate *template.Template ) @@ -101,6 +100,8 @@ type MultitenantAlertmanagerConfig struct { FallbackConfigFile string AutoWebhookRoot string + + Store AlertStoreConfig } const defaultClusterAddr = "0.0.0.0:9094" @@ -120,6 +121,8 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.ClusterAdvertiseAddr, "cluster.advertise-address", "", "Explicit address to advertise in cluster.") f.Var(&cfg.Peers, "cluster.peer", "Initial peers (may be repeated).") f.DurationVar(&cfg.PeerTimeout, "cluster.peer-timeout", time.Second*15, "Time to wait between peers to send notifications.") + + cfg.Store.RegisterFlags(f) } // A MultitenantAlertmanager manages Alertmanager instances for multiple @@ -127,7 +130,7 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) { type MultitenantAlertmanager struct { cfg *MultitenantAlertmanagerConfig - configsAPI configs_client.Client + store AlertStore // The fallback config is stored as a string and parsed every time it's needed // because we mutate the parsed results and don't want those changes to take @@ -135,13 +138,12 @@ type MultitenantAlertmanager struct { fallbackConfig string // All the organization configurations that we have. Only used for instrumentation. - cfgs map[string]configs.Config + cfgs map[string]alerts.AlertConfigDesc alertmanagersMtx sync.Mutex alertmanagers map[string]*Alertmanager - latestConfig configs.ID - latestMutex sync.RWMutex + logger log.Logger peer *cluster.Peer @@ -150,17 +152,12 @@ type MultitenantAlertmanager struct { } // NewMultitenantAlertmanager creates a new MultitenantAlertmanager. -func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, cfgCfg configs_client.Config) (*MultitenantAlertmanager, error) { +func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, logger log.Logger) (*MultitenantAlertmanager, error) { err := os.MkdirAll(cfg.DataDir, 0777) if err != nil { return nil, fmt.Errorf("unable to create Alertmanager data directory %q: %s", cfg.DataDir, err) } - configsAPI, err := configs_client.New(cfgCfg) - if err != nil { - return nil, err - } - var fallbackConfig []byte if cfg.FallbackConfigFile != "" { fallbackConfig, err = ioutil.ReadFile(cfg.FallbackConfigFile) @@ -176,7 +173,7 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, cfgCfg confi var peer *cluster.Peer if cfg.ClusterBindAddr != "" { peer, err = cluster.Create( - log.With(util.Logger, "component", "cluster"), + log.With(logger, "component", "cluster"), prometheus.DefaultRegisterer, cfg.ClusterBindAddr, cfg.ClusterAdvertiseAddr, @@ -193,18 +190,24 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, cfgCfg confi } err = peer.Join(cluster.DefaultReconnectInterval, cluster.DefaultReconnectTimeout) if err != nil { - level.Warn(util.Logger).Log("msg", "unable to join gossip mesh", "err", err) + level.Warn(logger).Log("msg", "unable to join gossip mesh", "err", err) } go peer.Settle(context.Background(), cluster.DefaultGossipInterval) } + store, err := NewAlertStore(cfg.Store) + if err != nil { + return nil, err + } + am := &MultitenantAlertmanager{ cfg: cfg, - configsAPI: configsAPI, fallbackConfig: string(fallbackConfig), - cfgs: map[string]configs.Config{}, + cfgs: map[string]alerts.AlertConfigDesc{}, alertmanagers: map[string]*Alertmanager{}, peer: peer, + store: store, + logger: log.With(logger, "component", "MultiTenantAlertmanager"), stop: make(chan struct{}), done: make(chan struct{}), } @@ -216,14 +219,14 @@ func (am *MultitenantAlertmanager) Run() { defer close(am.done) // Load initial set of all configurations before polling for new ones. - am.addNewConfigs(am.loadAllConfigs()) + am.syncConfigs(am.loadAllConfigs()) ticker := time.NewTicker(am.cfg.PollInterval) for { select { - case now := <-ticker.C: - err := am.updateConfigs(now) + case <-ticker.C: + err := am.updateConfigs() if err != nil { - level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: error updating configs", "err", err) + level.Warn(am.logger).Log("msg", "error updating configs", "err", err) } case <-am.stop: ticker.Stop() @@ -243,65 +246,69 @@ func (am *MultitenantAlertmanager) Stop() { am.alertmanagersMtx.Unlock() err := am.peer.Leave(am.cfg.PeerTimeout) if err != nil { - level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: failed to leave the cluster", "err", err) + level.Warn(am.logger).Log("msg", "failed to leave the cluster", "err", err) } - level.Debug(util.Logger).Log("msg", "MultitenantAlertmanager stopped") + level.Debug(am.logger).Log("msg", "stopping") } -// Load the full set of configurations from the server, retrying with backoff +// Load the full set of configurations from the alert store, retrying with backoff // until we can get them. -func (am *MultitenantAlertmanager) loadAllConfigs() map[string]configs.View { +func (am *MultitenantAlertmanager) loadAllConfigs() map[string]alerts.AlertConfigDesc { backoff := util.NewBackoff(context.Background(), backoffConfig) for { cfgs, err := am.poll() if err == nil { - level.Debug(util.Logger).Log("msg", "MultitenantAlertmanager: initial configuration load", "num_configs", len(cfgs)) + level.Debug(am.logger).Log("msg", "initial configuration load", "num_configs", len(cfgs)) return cfgs } - level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: error fetching all configurations, backing off", "err", err) + level.Warn(am.logger).Log("msg", "error fetching all configurations, backing off", "err", err) backoff.Wait() } } -func (am *MultitenantAlertmanager) updateConfigs(now time.Time) error { +func (am *MultitenantAlertmanager) updateConfigs() error { cfgs, err := am.poll() if err != nil { return err } - am.addNewConfigs(cfgs) + am.syncConfigs(cfgs) return nil } -// poll the configuration server. Not re-entrant. -func (am *MultitenantAlertmanager) poll() (map[string]configs.View, error) { - configID := am.latestConfig - cfgs, err := am.configsAPI.GetAlerts(context.Background(), configID) +// poll the alert store. Not re-entrant. +func (am *MultitenantAlertmanager) poll() (map[string]alerts.AlertConfigDesc, error) { + cfgs, err := am.store.ListAlertConfigs(context.Background()) if err != nil { - level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: configs server poll failed", "err", err) return nil, err } - am.latestMutex.Lock() - am.latestConfig = cfgs.GetLatestConfigID() - am.latestMutex.Unlock() - return cfgs.Configs, nil + return cfgs, nil } -func (am *MultitenantAlertmanager) addNewConfigs(cfgs map[string]configs.View) { - // TODO: instrument how many configs we have, both valid & invalid. - level.Debug(util.Logger).Log("msg", "adding configurations", "num_configs", len(cfgs)) - for userID, config := range cfgs { - if config.IsDeleted() { - am.deleteUser(userID) - continue - } - err := am.setConfig(userID, config.Config) +func (am *MultitenantAlertmanager) syncConfigs(cfgs map[string]alerts.AlertConfigDesc) { + invalid := 0 // Count the number of invalid configs as we go. + + level.Debug(am.logger).Log("msg", "adding configurations", "num_configs", len(cfgs)) + for _, cfg := range cfgs { + err := am.setConfig(cfg) if err != nil { - level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: error applying config", "err", err) - continue + invalid++ + level.Warn(am.logger).Log("msg", "error applying config", "err", err) } + } + am.alertmanagersMtx.Lock() + defer am.alertmanagersMtx.Unlock() + for user, userAM := range am.alertmanagers { + if _, exists := cfgs[user]; !exists { + level.Info(am.logger).Log("msg", "deleting per-tenant alertmanager", "user", user) + userAM.Stop() + delete(am.alertmanagers, user) + delete(am.cfgs, user) + level.Info(am.logger).Log("msg", "deleted per-tenant alertmanager", "user", user) + } } - totalConfigs.Set(float64(len(am.cfgs))) + totalConfigs.WithLabelValues("invalid").Set(float64(invalid)) + totalConfigs.WithLabelValues("valid").Set(float64(len(am.alertmanagers) - invalid)) } func (am *MultitenantAlertmanager) transformConfig(userID string, amConfig *amconfig.Config) (*amconfig.Config, error) { @@ -347,16 +354,16 @@ func (am *MultitenantAlertmanager) createTemplatesFile(userID, fn, content strin // setConfig applies the given configuration to the alertmanager for `userID`, // creating an alertmanager if it doesn't already exist. -func (am *MultitenantAlertmanager) setConfig(userID string, config configs.Config) error { +func (am *MultitenantAlertmanager) setConfig(cfg alerts.AlertConfigDesc) error { am.alertmanagersMtx.Lock() - existing, hasExisting := am.alertmanagers[userID] + existing, hasExisting := am.alertmanagers[cfg.User] am.alertmanagersMtx.Unlock() - var amConfig *amconfig.Config + var userAmConfig *amconfig.Config var err error var hasTemplateChanges bool - for fn, content := range config.TemplateFiles { - hasChanged, err := am.createTemplatesFile(userID, fn, content) + for _, tmpl := range cfg.Templates { + hasChanged, err := am.createTemplatesFile(cfg.User, tmpl.Filename, tmpl.Body) if err != nil { return err } @@ -366,70 +373,55 @@ func (am *MultitenantAlertmanager) setConfig(userID string, config configs.Confi } } - if config.AlertmanagerConfig == "" { + level.Debug(am.logger).Log("msg", "setting config", "user", cfg.User) + + if cfg.RawConfig == "" { if am.fallbackConfig == "" { - return fmt.Errorf("blank Alertmanager configuration for %v", userID) + return fmt.Errorf("blank Alertmanager configuration for %v", cfg.User) } - level.Info(util.Logger).Log("msg", "blank Alertmanager configuration; using fallback", "user_id", userID) - amConfig, err = amconfig.Load(am.fallbackConfig) + level.Info(am.logger).Log("msg", "blank Alertmanager configuration; using fallback", "user_id", cfg.User) + userAmConfig, err = amconfig.Load(am.fallbackConfig) if err != nil { - return fmt.Errorf("unable to load fallback configuration for %v: %v", userID, err) + return fmt.Errorf("unable to load fallback configuration for %v: %v", cfg.User, err) } } else { - amConfig, err = alertmanagerConfigFromConfig(config) + userAmConfig, err = amconfig.Load(cfg.RawConfig) if err != nil && hasExisting { // XXX: This means that if a user has a working configuration and // they submit a broken one, we'll keep processing the last known // working configuration, and they'll never know. // TODO: Provide a way of communicating this to the user and for removing // Alertmanager instances. - return fmt.Errorf("invalid Cortex configuration for %v: %v", userID, err) + return fmt.Errorf("invalid Cortex configuration for %v: %v", cfg.User, err) } } - if amConfig, err = am.transformConfig(userID, amConfig); err != nil { + if userAmConfig, err = am.transformConfig(cfg.User, userAmConfig); err != nil { return err } // If no Alertmanager instance exists for this user yet, start one. if !hasExisting { - newAM, err := am.newAlertmanager(userID, amConfig) + level.Debug(am.logger).Log("msg", "initializing new per-tenant alertmanager", "user", cfg.User) + newAM, err := am.newAlertmanager(cfg.User, userAmConfig) if err != nil { return err } am.alertmanagersMtx.Lock() - am.alertmanagers[userID] = newAM + am.alertmanagers[cfg.User] = newAM am.alertmanagersMtx.Unlock() - } else if am.cfgs[userID].AlertmanagerConfig != config.AlertmanagerConfig || hasTemplateChanges { + } else if am.cfgs[cfg.User].RawConfig != cfg.RawConfig || hasTemplateChanges { + level.Info(am.logger).Log("msg", "updating new per-tenant alertmanager", "user", cfg.User) // If the config changed, apply the new one. - err := existing.ApplyConfig(userID, amConfig) + err := existing.ApplyConfig(cfg.User, userAmConfig) if err != nil { - return fmt.Errorf("unable to apply Alertmanager config for user %v: %v", userID, err) + return fmt.Errorf("unable to apply Alertmanager config for user %v: %v", cfg.User, err) } } - am.cfgs[userID] = config + am.cfgs[cfg.User] = cfg return nil } -// alertmanagerConfigFromConfig returns the Alertmanager config from the Cortex configuration. -func alertmanagerConfigFromConfig(c configs.Config) (*amconfig.Config, error) { - cfg, err := amconfig.Load(c.AlertmanagerConfig) - if err != nil { - return nil, fmt.Errorf("error parsing Alertmanager config: %s", err) - } - return cfg, nil -} - -func (am *MultitenantAlertmanager) deleteUser(userID string) { - am.alertmanagersMtx.Lock() - if existing, hasExisting := am.alertmanagers[userID]; hasExisting { - existing.Stop() - } - delete(am.alertmanagers, userID) - delete(am.cfgs, userID) - am.alertmanagersMtx.Unlock() -} - func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amconfig.Config) (*Alertmanager, error) { newAM, err := New(&Config{ UserID: userID, diff --git a/pkg/alertmanager/multitenant_test.go b/pkg/alertmanager/multitenant_test.go new file mode 100644 index 00000000000..708004fd6e6 --- /dev/null +++ b/pkg/alertmanager/multitenant_test.go @@ -0,0 +1,121 @@ +// +build !race + +package alertmanager + +import ( + "context" + "io/ioutil" + "os" + "sync" + "testing" + + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/alertmanager/alerts" + "github.com/cortexproject/cortex/pkg/util/flagext" +) + +var ( + simpleConfigOne = `route: + receiver: dummy + +receivers: + - name: dummy` + + simpleConfigTwo = `route: + receiver: dummy + +receivers: + - name: dummy` +) + +// basic easily configurable mock +type mockAlertStore struct { + configs map[string]alerts.AlertConfigDesc +} + +func (m *mockAlertStore) ListAlertConfigs(ctx context.Context) (map[string]alerts.AlertConfigDesc, error) { + return m.configs, nil +} + +// TestLoadAllConfigs ensures the multitenant alertmanager can properly load configs from a local backend store. +// It is excluded from the race detector due to a vendored race issue https://github.com/prometheus/alertmanager/issues/2182 +func TestLoadAllConfigs(t *testing.T) { + mockStore := &mockAlertStore{ + configs: map[string]alerts.AlertConfigDesc{ + "user1": { + User: "user1", + RawConfig: simpleConfigOne, + Templates: []*alerts.TemplateDesc{}, + }, + "user2": { + User: "user2", + RawConfig: simpleConfigOne, + Templates: []*alerts.TemplateDesc{}, + }, + }, + } + + externalURL := flagext.URLValue{} + err := externalURL.Set("http://localhost/api/prom") + require.NoError(t, err) + + tempDir, err := ioutil.TempDir(os.TempDir(), "alertmanager") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + am := &MultitenantAlertmanager{ + cfg: &MultitenantAlertmanagerConfig{ + ExternalURL: externalURL, + DataDir: tempDir, + }, + store: mockStore, + cfgs: map[string]alerts.AlertConfigDesc{}, + alertmanagersMtx: sync.Mutex{}, + alertmanagers: map[string]*Alertmanager{}, + logger: log.NewNopLogger(), + stop: make(chan struct{}), + done: make(chan struct{}), + } + + // Ensure the configs are synced correctly + require.NoError(t, am.updateConfigs()) + + require.Len(t, am.alertmanagers, 2) + + currentConfig, exists := am.cfgs["user1"] + require.True(t, exists) + require.Equal(t, simpleConfigOne, currentConfig.RawConfig) + + // Ensure when a 3rd config is added, it is synced correctly + mockStore.configs["user3"] = alerts.AlertConfigDesc{ + User: "user3", + RawConfig: simpleConfigOne, + Templates: []*alerts.TemplateDesc{}, + } + + require.NoError(t, am.updateConfigs()) + + require.Len(t, am.alertmanagers, 3) + + // Ensure the config is updated + mockStore.configs["user1"] = alerts.AlertConfigDesc{ + User: "user1", + RawConfig: simpleConfigTwo, + Templates: []*alerts.TemplateDesc{}, + } + + require.NoError(t, am.updateConfigs()) + + currentConfig, exists = am.cfgs["user1"] + require.True(t, exists) + require.Equal(t, simpleConfigTwo, currentConfig.RawConfig) + + // Test Delete User + delete(mockStore.configs, "user3") + require.NoError(t, am.updateConfigs()) + currentConfig, exists = am.cfgs["user3"] + require.False(t, exists) + require.Equal(t, alerts.AlertConfigDesc{}, currentConfig) +} diff --git a/pkg/alertmanager/storage.go b/pkg/alertmanager/storage.go new file mode 100644 index 00000000000..7a94b676114 --- /dev/null +++ b/pkg/alertmanager/storage.go @@ -0,0 +1,47 @@ +package alertmanager + +import ( + "context" + "flag" + "fmt" + + "github.com/cortexproject/cortex/pkg/alertmanager/alerts" + "github.com/cortexproject/cortex/pkg/alertmanager/alerts/configdb" + "github.com/cortexproject/cortex/pkg/alertmanager/alerts/local" + "github.com/cortexproject/cortex/pkg/configs/client" +) + +// AlertStore stores and configures users rule configs +type AlertStore interface { + ListAlertConfigs(ctx context.Context) (map[string]alerts.AlertConfigDesc, error) +} + +// AlertStoreConfig configures the alertmanager backend +type AlertStoreConfig struct { + Type string `yaml:"type"` + ConfigDB client.Config `yaml:"configdb"` + Local local.StoreConfig `yaml:"local"` +} + +// RegisterFlags registers flags. +func (cfg *AlertStoreConfig) RegisterFlags(f *flag.FlagSet) { + cfg.Local.RegisterFlags(f) + cfg.ConfigDB.RegisterFlagsWithPrefix("alertmanager.", f) + f.StringVar(&cfg.Type, "alertmanager.storage.type", "configdb", "Type of backend to use to store alertmanager configs. Supported values are: \"configdb\", \"local\".") +} + +// NewAlertStore returns a new rule storage backend poller and store +func NewAlertStore(cfg AlertStoreConfig) (AlertStore, error) { + switch cfg.Type { + case "configdb": + c, err := client.New(cfg.ConfigDB) + if err != nil { + return nil, err + } + return configdb.NewStore(c), nil + case "local": + return local.NewStore(cfg.Local) + default: + return nil, fmt.Errorf("unrecognized alertmanager storage backend %v, choose one of: \"configdb\", \"local\"", cfg.Type) + } +} diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 78989ff3083..7c932d5c3e6 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -21,7 +21,6 @@ import ( chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" "github.com/cortexproject/cortex/pkg/compactor" "github.com/cortexproject/cortex/pkg/configs/api" - config_client "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/distributor" "github.com/cortexproject/cortex/pkg/ingester" @@ -82,7 +81,6 @@ type Config struct { Ruler ruler.Config `yaml:"ruler,omitempty"` ConfigDB db.Config `yaml:"configdb,omitempty"` - ConfigStore config_client.Config `yaml:"config_store,omitempty"` Alertmanager alertmanager.MultitenantAlertmanagerConfig `yaml:"alertmanager,omitempty"` RuntimeConfig runtimeconfig.ManagerConfig `yaml:"runtime_config,omitempty"` MemberlistKV memberlist.KVConfig `yaml:"memberlist"` @@ -118,7 +116,6 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Ruler.RegisterFlags(f) c.ConfigDB.RegisterFlags(f) - c.ConfigStore.RegisterFlagsWithPrefix("alertmanager.", f) c.Alertmanager.RegisterFlags(f) c.RuntimeConfig.RegisterFlags(f) c.MemberlistKV.RegisterFlags(f, "") diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index a614c436079..ab44e64f6ed 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -478,7 +478,7 @@ func (t *Cortex) stopConfigs() error { } func (t *Cortex) initAlertmanager(cfg *Config) (err error) { - t.alertmanager, err = alertmanager.NewMultitenantAlertmanager(&cfg.Alertmanager, cfg.ConfigStore) + t.alertmanager, err = alertmanager.NewMultitenantAlertmanager(&cfg.Alertmanager, util.Logger) if err != nil { return err } From 6f474e9e5c43eb999d62d479cc54f2825342fb66 Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Mon, 17 Feb 2020 14:09:28 -0500 Subject: [PATCH 2/3] fix client function Signed-off-by: Jacob Lisi --- integration/getting_started_single_process_config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/getting_started_single_process_config_test.go b/integration/getting_started_single_process_config_test.go index 7f77e882545..b78a6fc3878 100644 --- a/integration/getting_started_single_process_config_test.go +++ b/integration/getting_started_single_process_config_test.go @@ -29,7 +29,7 @@ func TestGettingStartedSingleProcessConfig(t *testing.T) { cortex := e2ecortex.NewSingleBinary("cortex-1", flags, "", 9009) require.NoError(t, s.StartAndWaitReady(cortex)) - c, err := e2ecortex.NewClient(cortex.Endpoint(9009), cortex.Endpoint(9009), "user-1") + c, err := e2ecortex.NewClient(cortex.Endpoint(9009), cortex.Endpoint(9009), "", "user-1") require.NoError(t, err) // Push some series to Cortex. From add608dfffce56aae2b96f77a401e2dd5e20594a Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Mon, 17 Feb 2020 14:12:51 -0500 Subject: [PATCH 3/3] update changelog Signed-off-by: Jacob Lisi --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13b644078f3..c05b0813405 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,11 +10,12 @@ * `--querier.query-store-after` has been added in it's place. * [CHANGE] Experimental Memberlist KV store can now be used in single-binary Cortex. Attempts to use it previously would fail with panic. This change also breaks existing binary protocol used to exchange gossip messages, so this version will not be able to understand gossiped Ring when used in combination with the previous version of Cortex. Easiest way to upgrade is to shutdown old Cortex installation, and restart it with new version. Incremental rollout works too, but with reduced functionality until all components run the same version. #2016 * [CHANGE] Renamed the cache configuration setting `defaul_validity` to `default_validity`. #2140 +* [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125 * [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947 * `--experimental.distributor.user-subring-size` * [FEATURE] Added flag `-experimental.ruler.enable-api` to enable the ruler api which implements the Prometheus API `/api/v1/rules` and `/api/v1/alerts` endpoints under the configured `-http.prefix`. #1999 * [FEATURE] Added sharding support to compactor when using the experimental TSDB blocks storage. #2113 -* [ENHANCEMENT] Add `status` label to `cortex_alertmanager_configs` metric to guage the number of valid and invalid configs. #2125 +* [ENHANCEMENT] Add `status` label to `cortex_alertmanager_configs` metric to gauge the number of valid and invalid configs. #2125 * [ENHANCEMENT] Cassandra Authentication: added the `custom_authenticators` config option that allows users to authenticate with cassandra clusters using password authenticators that are not approved by default in [gocql](https://github.com/gocql/gocql/blob/81b8263d9fe526782a588ef94d3fa5c6148e5d67/conn.go#L27) #2093 * [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023 * [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026