From 6223db82937c110c1d7cce606336d6813005cb9f Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Tue, 10 Dec 2019 16:37:55 -0500 Subject: [PATCH 01/11] add ruler API backed by object storage Signed-off-by: Jacob Lisi --- Makefile | 1 + integration/alertmanager_test.go | 2 +- integration/api_ruler_test.go | 64 ++++ integration/backward_compatibility_test.go | 4 +- integration/chunks_storage_backends_test.go | 2 +- integration/configs.go | 10 + integration/e2ecortex/client.go | 101 +++++- integration/e2ecortex/services.go | 19 ++ ...ting_started_single_process_config_test.go | 4 +- integration/ingester_flush_test.go | 2 +- integration/ingester_hand_over_test.go | 2 +- integration/metrics_test.go | 4 +- integration/querier_test.go | 6 +- integration/query_frontend_test.go | 4 +- pkg/chunk/aws/fixtures.go | 3 +- pkg/chunk/aws/s3_storage_client.go | 6 +- pkg/chunk/azure/blob_storage_client.go | 35 ++- pkg/chunk/gcp/fixtures.go | 2 +- pkg/chunk/gcp/gcs_object_client.go | 22 +- pkg/chunk/storage/factory.go | 6 +- pkg/ruler/api.go | 290 ++++++++++++++++++ pkg/ruler/ruler.go | 8 + pkg/ruler/rules/objectclient/rule_store.go | 164 ++++++++++ pkg/ruler/rules/store.go | 24 ++ pkg/ruler/storage.go | 31 +- pkg/ruler/store_mock_test.go | 87 ++++++ 26 files changed, 855 insertions(+), 48 deletions(-) create mode 100644 integration/api_ruler_test.go create mode 100644 pkg/ruler/rules/objectclient/rule_store.go diff --git a/Makefile b/Makefile index d384ff661a4..99367e98e34 100644 --- a/Makefile +++ b/Makefile @@ -56,6 +56,7 @@ pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto +pkg/ruler/ruler.pb.go: pkg/ruler/rules/rules.proto pkg/ring/kv/memberlist/kv.pb.go: pkg/ring/kv/memberlist/kv.proto all: $(UPTODATE_FILES) diff --git a/integration/alertmanager_test.go b/integration/alertmanager_test.go index 89cc82de8ee..6a0139954df 100644 --- a/integration/alertmanager_test.go +++ b/integration/alertmanager_test.go @@ -23,7 +23,7 @@ func TestAlertmanager(t *testing.T) { require.NoError(t, s.StartAndWaitReady(alertmanager)) require.NoError(t, alertmanager.WaitSumMetrics(e2e.Equals(1), "cortex_alertmanager_configs")) - c, err := e2ecortex.NewClient("", "", alertmanager.HTTPEndpoint(), "user-1") + c, err := e2ecortex.NewClient("", "", alertmanager.HTTPEndpoint(), "", "user-1") require.NoError(t, err) cfg, err := c.GetAlertmanagerConfig(context.Background()) diff --git a/integration/api_ruler_test.go b/integration/api_ruler_test.go new file mode 100644 index 00000000000..bc73089e606 --- /dev/null +++ b/integration/api_ruler_test.go @@ -0,0 +1,64 @@ +// +build requires_docker + +package main + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" + rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt" +) + +func TestRulerAPI(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + dynamo := e2edb.NewDynamoDB() + minio := e2edb.NewMinio(9000, RulerConfigs["-ruler.storage.s3.buckets"]) + require.NoError(t, s.StartAndWaitReady(minio, dynamo)) + + // Start Cortex components. + require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) + ruler := e2ecortex.NewRuler("ruler", mergeFlags(ChunksStorageFlags, RulerConfigs), "") + require.NoError(t, s.StartAndWaitReady(ruler)) + + c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), "user-1") + require.NoError(t, err) + + namespace := "test_namespace" + rg := rulefmt.RuleGroup{ + Name: "test_group", + Interval: 100, + Rules: []rulefmt.Rule{ + rulefmt.Rule{ + Record: "test_rule", + Expr: "up", + }, + }, + } + + require.NoError(t, c.SetRuleGroup(rg, namespace)) + + rgs, err := c.GetRuleGroups() + require.NoError(t, err) + + retrievedNamespace, exists := rgs[namespace] + require.True(t, exists) + require.Len(t, retrievedNamespace, 1) + require.Equal(t, retrievedNamespace[0].Name, rg.Name) + + // Ensure the rule group is loaded by the per-tenant Prometheus rules manager + require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1), "cortex_cortex_ruler_managers_total")) + require.NoError(t, c.DeleteRuleGroup(namespace, rg.Name)) + + require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(2), "cortex_ruler_config_updates_total")) + + _, err = c.GetRuleGroups() + require.Error(t, err) +} diff --git a/integration/backward_compatibility_test.go b/integration/backward_compatibility_test.go index e9a75d5b8c7..a5d128829d0 100644 --- a/integration/backward_compatibility_test.go +++ b/integration/backward_compatibility_test.go @@ -61,7 +61,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) { now := time.Now() series, expectedVector := generateSeries("series_1", now) - c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "user-1") + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") require.NoError(t, err) res, err := c.Push(series) @@ -91,7 +91,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) { require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) // Query the series - c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1") + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) result, err := c.Query("series_1", now) diff --git a/integration/chunks_storage_backends_test.go b/integration/chunks_storage_backends_test.go index 6444cf81a11..064ebf3d4a2 100644 --- a/integration/chunks_storage_backends_test.go +++ b/integration/chunks_storage_backends_test.go @@ -77,7 +77,7 @@ func TestChunksStorageAllIndexBackends(t *testing.T) { require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) - client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1") + client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) // Push and Query some series from Cortex for each day starting from oldest start time from configs until now so that we test all the Index Stores diff --git a/integration/configs.go b/integration/configs.go index c299c51d89d..0037e281e57 100644 --- a/integration/configs.go +++ b/integration/configs.go @@ -50,6 +50,16 @@ var ( "-alertmanager.web.external-url": "http://localhost/api/prom", } + RulerConfigs = map[string]string{ + "-ruler.enable-sharding": "false", + "-ruler.poll-interval": "5s", + "-experimental.ruler.enable-api": "true", + "-ruler.storage.type": "s3", + "-ruler.storage.s3.buckets": "cortex-rules", + "-ruler.storage.s3.force-path-style": "true", + "-ruler.storage.s3.url": fmt.Sprintf("s3://%s:%s@%s-minio-9000.:9000", e2edb.MinioAccessKey, e2edb.MinioSecretKey, networkName), + } + BlocksStorageFlags = map[string]string{ "-store.engine": "tsdb", "-experimental.tsdb.backend": "s3", diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 7efb1ba907d..a10242297a5 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "io/ioutil" "net/http" "time" @@ -16,11 +17,14 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" yaml "gopkg.in/yaml.v2" + + rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt" ) // Client is a client used to interact with Cortex in integration tests type Client struct { alertmanagerClient promapi.Client + rulerAddress string distributorAddress string timeout time.Duration httpClient *http.Client @@ -29,7 +33,13 @@ type Client struct { } // NewClient makes a new Cortex client -func NewClient(distributorAddress string, querierAddress string, alertmanagerAddress string, orgID string) (*Client, error) { +func NewClient( + distributorAddress string, + querierAddress string, + alertmanagerAddress string, + rulerAddress string, + orgID string, +) (*Client, error) { // Create querier API client querierAPIClient, err := promapi.NewClient(promapi.Config{ Address: "http://" + querierAddress + "/api/prom", @@ -41,6 +51,7 @@ func NewClient(distributorAddress string, querierAddress string, alertmanagerAdd c := &Client{ distributorAddress: distributorAddress, + rulerAddress: rulerAddress, timeout: 5 * time.Second, httpClient: &http.Client{}, querierClient: promv1.NewAPI(querierAPIClient), @@ -144,3 +155,91 @@ func (c *Client) GetAlertmanagerConfig(ctx context.Context) (*alertConfig.Config return cfg, err } + +// GetRuleGroups gets the status of an alertmanager instance +func (c *Client) GetRuleGroups() (map[string][]rulefmt.RuleGroup, error) { + // Create HTTP request + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/rules", c.rulerAddress), nil) + if err != nil { + return nil, err + } + req.Header.Set("X-Scope-OrgID", c.orgID) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Execute HTTP request + res, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + + defer res.Body.Close() + rgs := map[string][]rulefmt.RuleGroup{} + + data, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + + err = yaml.Unmarshal(data, rgs) + if err != nil { + return nil, err + } + + return rgs, nil +} + +// SetRuleGroup gets the status of an alertmanager instance +func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) error { + // Create write request + data, err := yaml.Marshal(rulegroup) + if err != nil { + return err + } + + // Create HTTP request + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/rules/%s", c.rulerAddress, namespace), bytes.NewReader(data)) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/yaml") + req.Header.Set("X-Scope-OrgID", c.orgID) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Execute HTTP request + res, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + + defer res.Body.Close() + return nil +} + +// DeleteRuleGroup gets the status of an alertmanager instance +func (c *Client) DeleteRuleGroup(namespace string, groupName string) error { + // Create HTTP request + req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/api/prom/rules/%s/%s", c.rulerAddress, namespace, groupName), nil) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/yaml") + req.Header.Set("X-Scope-OrgID", c.orgID) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Execute HTTP request + res, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + + defer res.Body.Close() + return nil +} diff --git a/integration/e2ecortex/services.go b/integration/e2ecortex/services.go index a882db0d898..1680c27251a 100644 --- a/integration/e2ecortex/services.go +++ b/integration/e2ecortex/services.go @@ -210,3 +210,22 @@ func NewAlertmanager(name string, flags map[string]string, image string) *Cortex grpcPort, ) } + +func NewRuler(name string, flags map[string]string, image string) *CortexService { + if image == "" { + image = GetDefaultImage() + } + + return NewCortexService( + name, + image, + e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{ + "-target": "ruler", + "-log.level": "warn", + }, flags))...), + // The alertmanager doesn't expose a readiness probe, so we just check if the / returns 200 + e2e.NewReadinessProbe(httpPort, "/", 200), + httpPort, + grpcPort, + ) +} diff --git a/integration/getting_started_single_process_config_test.go b/integration/getting_started_single_process_config_test.go index 634aa1f9b5b..6455b6d8a87 100644 --- a/integration/getting_started_single_process_config_test.go +++ b/integration/getting_started_single_process_config_test.go @@ -33,7 +33,7 @@ func TestGettingStartedSingleProcessConfigWithChunksStorage(t *testing.T) { cortex := e2ecortex.NewSingleBinary("cortex-1", flags, "", 9009, 9095) require.NoError(t, s.StartAndWaitReady(cortex)) - c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "user-1") + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) // Push some series to Cortex. @@ -77,7 +77,7 @@ func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) { cortex := e2ecortex.NewSingleBinary("cortex-1", flags, "", 9009, 9095) require.NoError(t, s.StartAndWaitReady(cortex)) - c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "user-1") + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) // Push some series to Cortex. diff --git a/integration/ingester_flush_test.go b/integration/ingester_flush_test.go index 6079ea5e7bd..7a8a5dcd30a 100644 --- a/integration/ingester_flush_test.go +++ b/integration/ingester_flush_test.go @@ -48,7 +48,7 @@ func TestIngesterFlushWithChunksStorage(t *testing.T) { require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) - c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1") + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) // Push some series to Cortex. diff --git a/integration/ingester_hand_over_test.go b/integration/ingester_hand_over_test.go index cd88e7f7d7e..ef69f2e3a52 100644 --- a/integration/ingester_hand_over_test.go +++ b/integration/ingester_hand_over_test.go @@ -58,7 +58,7 @@ func runIngesterHandOverTest(t *testing.T, flags map[string]string, setup func(t require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) - c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1") + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) // Push some series to Cortex. diff --git a/integration/metrics_test.go b/integration/metrics_test.go index 2fff267f1b2..cf2d0dfdc4e 100644 --- a/integration/metrics_test.go +++ b/integration/metrics_test.go @@ -54,7 +54,7 @@ func TestExportedMetrics(t *testing.T) { now := time.Now() series, expectedVector := generateSeries("series_1", now) - c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "user-1") + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") require.NoError(t, err) res, err := c.Push(series) @@ -63,7 +63,7 @@ func TestExportedMetrics(t *testing.T) { // Query the series both from the querier and query-frontend (to hit the read path). for _, endpoint := range []string{querier.HTTPEndpoint(), queryFrontend.HTTPEndpoint()} { - c, err := e2ecortex.NewClient("", endpoint, "", "user-1") + c, err := e2ecortex.NewClient("", endpoint, "", "", "user-1") require.NoError(t, err) result, err := c.Query("series_1", now) diff --git a/integration/querier_test.go b/integration/querier_test.go index fc576057f42..f3747811386 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -63,7 +63,7 @@ func TestQuerierWithBlocksStorage(t *testing.T) { require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) - c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1") + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) // Push some series to Cortex. @@ -167,7 +167,7 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) { require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) // Push some series to Cortex. - c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "user-1") + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") require.NoError(t, err) series1Timestamp := time.Now() @@ -200,7 +200,7 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) { require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) // Query back the series. - c, err = e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "user-1") + c, err = e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) result, err := c.Query("series_1", series1Timestamp) diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index bfed8af0c2b..39521ec03f2 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -112,7 +112,7 @@ func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) { expectedVectors := make([]model.Vector, numUsers) for u := 0; u < numUsers; u++ { - c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", fmt.Sprintf("user-%d", u)) + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", fmt.Sprintf("user-%d", u)) require.NoError(t, err) var series []prompb.TimeSeries @@ -130,7 +130,7 @@ func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) { for u := 0; u < numUsers; u++ { userID := u - c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", fmt.Sprintf("user-%d", userID)) + c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", fmt.Sprintf("user-%d", userID)) require.NoError(t, err) for q := 0; q < numQueriesPerUser; q++ { diff --git a/pkg/chunk/aws/fixtures.go b/pkg/chunk/aws/fixtures.go index d0f41e4685b..a9bf9dd96c6 100644 --- a/pkg/chunk/aws/fixtures.go +++ b/pkg/chunk/aws/fixtures.go @@ -47,7 +47,8 @@ var Fixtures = []testutils.Fixture{ schemaCfg: schemaConfig, } object := objectclient.NewClient(&S3ObjectClient{ - S3: newMockS3(), + S3: newMockS3(), + delimiter: chunk.DirDelim, }, nil) return index, object, table, schemaConfig, nil }, diff --git a/pkg/chunk/aws/s3_storage_client.go b/pkg/chunk/aws/s3_storage_client.go index 69efd2dd43c..9c408ac2411 100644 --- a/pkg/chunk/aws/s3_storage_client.go +++ b/pkg/chunk/aws/s3_storage_client.go @@ -57,10 +57,11 @@ func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { type S3ObjectClient struct { bucketNames []string S3 s3iface.S3API + delimiter string } // NewS3ObjectClient makes a new S3-backed ObjectClient. -func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) { +func NewS3ObjectClient(cfg S3Config, delimiter string) (*S3ObjectClient, error) { if cfg.S3.URL == nil { return nil, fmt.Errorf("no URL specified for S3") } @@ -84,6 +85,7 @@ func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) { client := S3ObjectClient{ S3: s3Client, bucketNames: bucketNames, + delimiter: delimiter, } return &client, nil } @@ -173,7 +175,7 @@ func (a *S3ObjectClient) List(ctx context.Context, prefix string) ([]chunk.Stora input := s3.ListObjectsV2Input{ Bucket: aws.String(a.bucketNames[i]), Prefix: aws.String(prefix), - Delimiter: aws.String(chunk.DirDelim), + Delimiter: aws.String(a.delimiter), } for { diff --git a/pkg/chunk/azure/blob_storage_client.go b/pkg/chunk/azure/blob_storage_client.go index df4a3373516..8372c606861 100644 --- a/pkg/chunk/azure/blob_storage_client.go +++ b/pkg/chunk/azure/blob_storage_client.go @@ -35,16 +35,21 @@ type BlobStorageConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (c *BlobStorageConfig) RegisterFlags(f *flag.FlagSet) { - f.StringVar(&c.ContainerName, "azure.container-name", "cortex", "Name of the blob container used to store chunks. Defaults to `cortex`. This container must be created before running cortex.") - f.StringVar(&c.AccountName, "azure.account-name", "", "The Microsoft Azure account name to be used") - f.Var(&c.AccountKey, "azure.account-key", "The Microsoft Azure account key to use.") - f.DurationVar(&c.RequestTimeout, "azure.request-timeout", 30*time.Second, "Timeout for requests made against azure blob storage. Defaults to 30 seconds.") - f.IntVar(&c.DownloadBufferSize, "azure.download-buffer-size", 512000, "Preallocated buffer size for downloads (default is 512KB)") - f.IntVar(&c.UploadBufferSize, "azure.upload-buffer-size", 256000, "Preallocated buffer size for up;oads (default is 256KB)") - f.IntVar(&c.UploadBufferCount, "azure.download-buffer-count", 1, "Number of buffers used to used to upload a chunk. (defaults to 1)") - f.IntVar(&c.MaxRetries, "azure.max-retries", 5, "Number of retries for a request which times out.") - f.DurationVar(&c.MinRetryDelay, "azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.") - f.DurationVar(&c.MaxRetryDelay, "azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.") + c.RegisterFlagsWithPrefix("", f) +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&c.ContainerName, prefix+"azure.container-name", "cortex", "Name of the blob container used to store chunks. Defaults to `cortex`. This container must be created before running cortex.") + f.StringVar(&c.AccountName, prefix+"azure.account-name", "", "The Microsoft Azure account name to be used") + f.Var(&c.AccountKey, prefix+"azure.account-key", "The Microsoft Azure account key to use.") + f.DurationVar(&c.RequestTimeout, prefix+"azure.request-timeout", 30*time.Second, "Timeout for requests made against azure blob storage. Defaults to 30 seconds.") + f.IntVar(&c.DownloadBufferSize, prefix+"azure.download-buffer-size", 512000, "Preallocated buffer size for downloads (default is 512KB)") + f.IntVar(&c.UploadBufferSize, prefix+"azure.upload-buffer-size", 256000, "Preallocated buffer size for up;oads (default is 256KB)") + f.IntVar(&c.UploadBufferCount, prefix+"azure.download-buffer-count", 1, "Number of buffers used to used to upload a chunk. (defaults to 1)") + f.IntVar(&c.MaxRetries, prefix+"azure.max-retries", 5, "Number of retries for a request which times out.") + f.DurationVar(&c.MinRetryDelay, prefix+"azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.") + f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.") } // BlobStorage is used to interact with azure blob storage for setting or getting time series chunks. @@ -53,11 +58,15 @@ type BlobStorage struct { //blobService storage.Serv cfg *BlobStorageConfig containerURL azblob.ContainerURL + delimiter string } // NewBlobStorage creates a new instance of the BlobStorage struct. -func NewBlobStorage(cfg *BlobStorageConfig) (*BlobStorage, error) { - blobStorage := &BlobStorage{cfg: cfg} +func NewBlobStorage(cfg *BlobStorageConfig, delimiter string) (*BlobStorage, error) { + blobStorage := &BlobStorage{ + cfg: cfg, + delimiter: delimiter, + } var err error blobStorage.containerURL, err = blobStorage.buildContainerURL() @@ -165,7 +174,7 @@ func (b *BlobStorage) List(ctx context.Context, prefix string) ([]chunk.StorageO return nil, ctx.Err() } - listBlob, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, chunk.DirDelim, azblob.ListBlobsSegmentOptions{Prefix: prefix}) + listBlob, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, b.delimiter, azblob.ListBlobsSegmentOptions{Prefix: prefix}) if err != nil { return nil, err } diff --git a/pkg/chunk/gcp/fixtures.go b/pkg/chunk/gcp/fixtures.go index 0275e7ebe8a..123d279ab47 100644 --- a/pkg/chunk/gcp/fixtures.go +++ b/pkg/chunk/gcp/fixtures.go @@ -79,7 +79,7 @@ func (f *fixture) Clients() ( if f.gcsObjectClient { cClient = objectclient.NewClient(newGCSObjectClient(GCSConfig{ BucketName: "chunks", - }, f.gcssrv.Client()), nil) + }, f.gcssrv.Client(), chunk.DirDelim), nil) } else { cClient = newBigtableObjectClient(Config{}, schemaConfig, client) } diff --git a/pkg/chunk/gcp/gcs_object_client.go b/pkg/chunk/gcp/gcs_object_client.go index f8703ef69e3..49c497c7367 100644 --- a/pkg/chunk/gcp/gcs_object_client.go +++ b/pkg/chunk/gcp/gcs_object_client.go @@ -13,9 +13,10 @@ import ( ) type GCSObjectClient struct { - cfg GCSConfig - client *storage.Client - bucket *storage.BucketHandle + cfg GCSConfig + client *storage.Client + bucket *storage.BucketHandle + delimiter string } // GCSConfig is config for the GCS Chunk Client. @@ -38,7 +39,7 @@ func (cfg *GCSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { } // NewGCSObjectClient makes a new chunk.Client that writes chunks to GCS. -func NewGCSObjectClient(ctx context.Context, cfg GCSConfig) (*GCSObjectClient, error) { +func NewGCSObjectClient(ctx context.Context, cfg GCSConfig, delimiter string) (*GCSObjectClient, error) { option, err := gcsInstrumentation(ctx, storage.ScopeReadWrite) if err != nil { return nil, err @@ -48,15 +49,16 @@ func NewGCSObjectClient(ctx context.Context, cfg GCSConfig) (*GCSObjectClient, e if err != nil { return nil, err } - return newGCSObjectClient(cfg, client), nil + return newGCSObjectClient(cfg, client, delimiter), nil } -func newGCSObjectClient(cfg GCSConfig, client *storage.Client) *GCSObjectClient { +func newGCSObjectClient(cfg GCSConfig, client *storage.Client, delimiter string) *GCSObjectClient { bucket := client.Bucket(cfg.BucketName) return &GCSObjectClient{ - cfg: cfg, - client: client, - bucket: bucket, + cfg: cfg, + client: client, + bucket: bucket, + delimiter: delimiter, } } @@ -109,7 +111,7 @@ func (s *GCSObjectClient) PutObject(ctx context.Context, objectKey string, objec func (s *GCSObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, error) { var storageObjects []chunk.StorageObject - iter := s.bucket.Objects(ctx, &storage.Query{Prefix: prefix, Delimiter: chunk.DirDelim}) + iter := s.bucket.Objects(ctx, &storage.Query{Prefix: prefix, Delimiter: s.delimiter}) for { if ctx.Err() != nil { return nil, ctx.Err() diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index b358b2f38f4..c14d1f02505 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -175,7 +175,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun case "inmemory": return chunk.NewMockStorage(), nil case "aws", "s3": - return newChunkClientFromStore(aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config)) + return newChunkClientFromStore(aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config, chunk.DirDelim)) case "aws-dynamo": if cfg.AWSStorageConfig.DynamoDB.URL == nil { return nil, fmt.Errorf("Must set -dynamodb.url in aws mode") @@ -186,13 +186,13 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun } return aws.NewDynamoDBChunkClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg) case "azure": - return newChunkClientFromStore(azure.NewBlobStorage(&cfg.AzureStorageConfig)) + return newChunkClientFromStore(azure.NewBlobStorage(&cfg.AzureStorageConfig, chunk.DirDelim)) case "gcp": return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "gcp-columnkey", "bigtable", "bigtable-hashed": return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "gcs": - return newChunkClientFromStore(gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig)) + return newChunkClientFromStore(gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig, chunk.DirDelim)) case "cassandra": return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg) case "filesystem": diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index 7cb1ec3e0ce..c199244ccbe 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -2,7 +2,9 @@ package ruler import ( "encoding/json" + "io/ioutil" "net/http" + "net/url" "sort" "strconv" "time" @@ -10,22 +12,33 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gorilla/mux" + "github.com/pkg/errors" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" + "gopkg.in/yaml.v2" "github.com/cortexproject/cortex/pkg/ingester/client" + rulefmt "github.com/cortexproject/cortex/pkg/ruler/legacy_rulefmt" + "github.com/cortexproject/cortex/pkg/ruler/rules" + store "github.com/cortexproject/cortex/pkg/ruler/rules" "github.com/cortexproject/cortex/pkg/util" ) // RegisterRoutes registers the ruler API HTTP routes with the provided Router. func (r *Ruler) RegisterRoutes(router *mux.Router) { + router = router.UseEncodedPath() for _, route := range []struct { name, method, path string handler http.HandlerFunc }{ {"get_rules", "GET", "/api/v1/rules", r.rules}, {"get_alerts", "GET", "/api/v1/alerts", r.alerts}, + {"list_rules", "GET", "/rules", r.listRules}, + {"list_rules_namespace", "GET", "/rules/{namespace}", r.listRules}, + {"get_rulegroup", "GET", "/rules/{namespace}/{groupName}", r.getRuleGroup}, + {"set_rulegroup", "POST", "/rules/{namespace}", r.createRuleGroup}, + {"delete_rulegroup", "DELETE", "/rules/{namespace}/{groupName}", r.deleteRuleGroup}, } { level.Debug(util.Logger).Log("msg", "ruler: registering route", "name", route.name, "method", route.method, "path", route.path) router.Handle(route.path, route.handler).Methods(route.method).Name(route.name) @@ -268,3 +281,280 @@ func (r *Ruler) alerts(w http.ResponseWriter, req *http.Request) { level.Error(logger).Log("msg", "error writing response", "bytesWritten", n, "err", err) } } + +var ( + // ErrNoNamespace signals the requested namespace does not exist + ErrNoNamespace = errors.New("a namespace must be provided in the url") + // ErrNoGroupName signals a group name url parameter was not found + ErrNoGroupName = errors.New("a matching group name must be provided in the url") + // ErrNoRuleGroups signals the rule group requested does not exist + ErrNoRuleGroups = errors.New("no rule groups found") + // ErrNoUserID is returned when no user ID is provided + ErrNoUserID = errors.New("no id provided") +) + +// ValidateRuleGroup validates a rulegroup +func ValidateRuleGroup(g rulefmt.RuleGroup) []error { + var errs []error + for i, r := range g.Rules { + for _, err := range r.Validate() { + var ruleName string + if r.Alert != "" { + ruleName = r.Alert + } else { + ruleName = r.Record + } + errs = append(errs, &rulefmt.Error{ + Group: g.Name, + Rule: i, + RuleName: ruleName, + Err: err, + }) + } + } + + return errs +} + +func (r *Ruler) listRules(w http.ResponseWriter, req *http.Request) { + logger := util.WithContext(req.Context(), util.Logger) + userID, _, err := user.ExtractOrgIDFromHTTPRequest(req) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + if userID == "" { + http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized) + return + } + + vars := mux.Vars(req) + + namespace := vars["namespace"] + if namespace != "" { + namespace, err = url.PathUnescape(namespace) // namespaces need to be unescaped if in the URL + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace) + } + + level.Debug(logger).Log("msg", "retrieving rule groups from rule store", "userID", userID) + rgs, err := r.store.ListRuleGroups(req.Context(), userID, namespace) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + level.Debug(logger).Log("msg", "retrieved rule groups from rule store", "userID", userID, "num_namespaces", len(rgs)) + + if len(rgs) == 0 { + level.Info(logger).Log("msg", "no rule groups found", "userID", userID) + http.Error(w, ErrNoRuleGroups.Error(), http.StatusNotFound) + return + } + + formatted := rgs.Formatted() + + d, err := yaml.Marshal(&formatted) + if err != nil { + level.Error(logger).Log("msg", "error marshalling yaml rule groups", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/yaml") + if _, err := w.Write(d); err != nil { + level.Error(logger).Log("msg", "error writing yaml response", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (r *Ruler) getRuleGroup(w http.ResponseWriter, req *http.Request) { + logger := util.WithContext(req.Context(), util.Logger) + + userID, _, err := user.ExtractOrgIDFromHTTPRequest(req) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + if userID == "" { + http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized) + return + } + + vars := mux.Vars(req) + namespace, exists := vars["namespace"] + if !exists { + http.Error(w, ErrNoNamespace.Error(), http.StatusUnauthorized) + return + } + + namespace, err = url.PathUnescape(namespace) // namespaces need to be unescaped if in the URL + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + groupName, exists := vars["groupName"] + if !exists { + http.Error(w, ErrNoGroupName.Error(), http.StatusUnauthorized) + return + } + + groupName, err = url.PathUnescape(groupName) // groupName need to be unescaped if in the URL + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + rg, err := r.store.GetRuleGroup(req.Context(), userID, namespace, groupName) + if err != nil { + if err == store.ErrGroupNotFound { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + formattedRG := store.FromProto(rg) + + d, err := yaml.Marshal(&formattedRG) + if err != nil { + level.Error(logger).Log("msg", "error marshalling yaml rule groups", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/yaml") + if _, err := w.Write(d); err != nil { + level.Error(logger).Log("msg", "error writing yaml response", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (r *Ruler) createRuleGroup(w http.ResponseWriter, req *http.Request) { + logger := util.WithContext(req.Context(), util.Logger) + userID, _, err := user.ExtractOrgIDFromHTTPRequest(req) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + if userID == "" { + http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized) + return + } + + vars := mux.Vars(req) + + namespace := vars["namespace"] + if namespace == "" { + level.Error(logger).Log("err", "no namespace provided with rule group") + http.Error(w, ErrNoNamespace.Error(), http.StatusBadRequest) + return + } + + namespace, err = url.PathUnescape(namespace) // namespaces need to be unescaped if in the URL + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + payload, err := ioutil.ReadAll(req.Body) + if err != nil { + level.Error(logger).Log("err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + level.Debug(logger).Log("msg", "attempting to unmarshal rulegroup", "userID", userID, "group", string(payload)) + + rg := rulefmt.RuleGroup{} + err = yaml.Unmarshal(payload, &rg) + if err != nil { + level.Error(logger).Log("err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + errs := ValidateRuleGroup(rg) + if len(errs) > 0 { + level.Error(logger).Log("err", err.Error()) + http.Error(w, errs[0].Error(), http.StatusBadRequest) + return + } + + rgProto := store.ToProto(userID, namespace, rg) + + level.Debug(logger).Log("msg", "attempting to store rulegroup", "userID", userID, "group", rgProto.String()) + err = r.store.SetRuleGroup(req.Context(), userID, namespace, rgProto) + if err != nil { + level.Error(logger).Log("err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Return a status accepted because the rule has been stored and queued for polling, but is not currently active + w.WriteHeader(http.StatusAccepted) +} + +func (r *Ruler) deleteRuleGroup(w http.ResponseWriter, req *http.Request) { + logger := util.WithContext(req.Context(), util.Logger) + userID, _, err := user.ExtractOrgIDFromHTTPRequest(req) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + if userID == "" { + http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized) + return + } + + vars := mux.Vars(req) + namespace, exists := vars["namespace"] + if !exists { + http.Error(w, ErrNoNamespace.Error(), http.StatusUnauthorized) + return + } + + namespace, err = url.PathUnescape(namespace) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + groupName, exists := vars["groupName"] + if !exists { + http.Error(w, ErrNoGroupName.Error(), http.StatusUnauthorized) + return + } + + groupName, err = url.PathUnescape(groupName) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = r.store.DeleteRuleGroup(req.Context(), userID, namespace, groupName) + if err != nil { + if err == rules.ErrGroupNotFound { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + level.Error(logger).Log("err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Return a status accepted because the rule has been stored and queued for polling, but is not currently active + w.WriteHeader(http.StatusAccepted) +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 7fda208b9a1..f87921fa046 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -48,6 +48,11 @@ var ( Name: "ruler_config_updates_total", Help: "Total number of config updates triggered by a user", }, []string{"user"}) + managersTotal = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "ruler_managers_total", + Help: "Total number of managers registered and running in the ruler", + }) ) // Config is the configuration for the recording rules server. @@ -343,6 +348,9 @@ func (r *Ruler) run(ctx context.Context) error { return nil case <-tick.C: r.loadRules(ctx) + r.userManagerMtx.Lock() + managersTotal.Set(float64(len(r.userManagers))) + r.userManagerMtx.Unlock() } } } diff --git a/pkg/ruler/rules/objectclient/rule_store.go b/pkg/ruler/rules/objectclient/rule_store.go new file mode 100644 index 00000000000..f03a3d5dbf7 --- /dev/null +++ b/pkg/ruler/rules/objectclient/rule_store.go @@ -0,0 +1,164 @@ +package objectclient + +import ( + "bytes" + "context" + "io/ioutil" + strings "strings" + + "github.com/go-kit/kit/log/level" + proto "github.com/gogo/protobuf/proto" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/ruler/rules" + "github.com/cortexproject/cortex/pkg/util" +) + +// Object Rule Storage Schema +// ======================= +// Object Name: "rules///" +// Storage Format: Encoded RuleGroupDesc + +const ( + rulePrefix = "rules/" +) + +// RuleStore allows cortex rules to be stored using an object store backend. +type RuleStore struct { + client chunk.ObjectClient +} + +// NewRuleStore returns a new RuleStore +func NewRuleStore(client chunk.ObjectClient) *RuleStore { + return &RuleStore{ + client: client, + } +} + +func (o *RuleStore) getRuleGroup(ctx context.Context, objectKey string) (*rules.RuleGroupDesc, error) { + reader, err := o.client.GetObject(ctx, objectKey) + if err == chunk.ErrStorageObjectNotFound { + level.Debug(util.Logger).Log("msg", "rule group does not exist", "name", objectKey) + return nil, rules.ErrGroupNotFound + } + + if err != nil { + return nil, err + } + defer reader.Close() + + buf, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + + rg := &rules.RuleGroupDesc{} + + err = proto.Unmarshal(buf, rg) + if err != nil { + return nil, err + } + + return rg, nil +} + +// ListAllRuleGroups returns all the active rule groups +func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rules.RuleGroupList, error) { + ruleGroupObjects, err := o.client.List(ctx, generateRuleObjectKey("", "", "")) + if err != nil { + return nil, err + } + + userGroupMap := map[string]rules.RuleGroupList{} + for _, obj := range ruleGroupObjects { + + user := decomposeRuleObjectKey(obj.Key) + if user == "" { + continue + } + + rg, err := o.getRuleGroup(ctx, obj.Key) + if err != nil { + return nil, err + } + + if _, exists := userGroupMap[user]; !exists { + userGroupMap[user] = rules.RuleGroupList{} + } + userGroupMap[user] = append(userGroupMap[user], rg) + } + + return userGroupMap, nil +} + +// ListRuleGroups returns all the active rule groups for a user +func (o *RuleStore) ListRuleGroups(ctx context.Context, userID, namespace string) (rules.RuleGroupList, error) { + ruleGroupObjects, err := o.client.List(ctx, generateRuleObjectKey(userID, namespace, "")) + if err != nil { + return nil, err + } + + groups := []*rules.RuleGroupDesc{} + for _, obj := range ruleGroupObjects { + level.Debug(util.Logger).Log("msg", "listing rule group", "key", obj.Key) + + rg, err := o.getRuleGroup(ctx, obj.Key) + if err != nil { + level.Error(util.Logger).Log("msg", "unable to retrieve rule group", "err", err, "key", obj.Key) + return nil, err + } + groups = append(groups, rg) + } + return groups, nil +} + +// GetRuleGroup returns the requested rule group +func (o *RuleStore) GetRuleGroup(ctx context.Context, userID string, namespace string, grp string) (*rules.RuleGroupDesc, error) { + handle := generateRuleObjectKey(userID, namespace, grp) + rg, err := o.getRuleGroup(ctx, handle) + if err != nil { + return nil, err + } + + return rg, nil +} + +// SetRuleGroup sets provided rule group +func (o *RuleStore) SetRuleGroup(ctx context.Context, userID string, namespace string, group *rules.RuleGroupDesc) error { + data, err := proto.Marshal(group) + if err != nil { + return err + } + + objectKey := generateRuleObjectKey(userID, namespace, group.Name) + return o.client.PutObject(ctx, objectKey, bytes.NewReader(data)) +} + +// DeleteRuleGroup deletes the specified rule group +func (o *RuleStore) DeleteRuleGroup(ctx context.Context, userID string, namespace string, groupName string) error { + objectKey := generateRuleObjectKey(userID, namespace, groupName) + err := o.client.DeleteObject(ctx, objectKey) + if err == chunk.ErrStorageObjectNotFound { + return rules.ErrGroupNotFound + } + return err +} + +func generateRuleObjectKey(id, namespace, name string) string { + if id == "" { + return rulePrefix + } + prefix := rulePrefix + id + "/" + if namespace == "" { + return prefix + } + return prefix + namespace + "/" + name +} + +func decomposeRuleObjectKey(handle string) string { + components := strings.Split(handle, "/") + if len(components) != 4 { + return "" + } + return components[1] +} diff --git a/pkg/ruler/rules/store.go b/pkg/ruler/rules/store.go index 6cca7de3768..d48777c3771 100644 --- a/pkg/ruler/rules/store.go +++ b/pkg/ruler/rules/store.go @@ -23,6 +23,10 @@ var ( // RuleStore is used to store and retrieve rules type RuleStore interface { ListAllRuleGroups(ctx context.Context) (map[string]RuleGroupList, error) + ListRuleGroups(ctx context.Context, userID string, namespace string) (RuleGroupList, error) + GetRuleGroup(ctx context.Context, userID, namespace, group string) (*RuleGroupDesc, error) + SetRuleGroup(ctx context.Context, userID, namespace string, group *RuleGroupDesc) error + DeleteRuleGroup(ctx context.Context, userID, namespace string, group string) error } // RuleGroupList contains a set of rule groups @@ -106,3 +110,23 @@ func getLatestConfigID(cfgs map[string]userconfig.VersionedRulesConfig, latest u } return ret } + +// ListRuleGroups is not implemented +func (c *ConfigRuleStore) ListRuleGroups(ctx context.Context, userID string, namespace string) (RuleGroupList, error) { + return nil, errors.New("not implemented by the config service rule store") +} + +// GetRuleGroup is not implemented +func (c *ConfigRuleStore) GetRuleGroup(ctx context.Context, userID, namespace, group string) (*RuleGroupDesc, error) { + return nil, errors.New("not implemented by the config service rule store") +} + +// SetRuleGroup is not implemented +func (c *ConfigRuleStore) SetRuleGroup(ctx context.Context, userID, namespace string, group *RuleGroupDesc) error { + return errors.New("not implemented by the config service rule store") +} + +// DeleteRuleGroup is not implemented +func (c *ConfigRuleStore) DeleteRuleGroup(ctx context.Context, userID, namespace string, group string) error { + return errors.New("not implemented by the config service rule store") +} diff --git a/pkg/ruler/storage.go b/pkg/ruler/storage.go index 536a9989342..5262312608d 100644 --- a/pkg/ruler/storage.go +++ b/pkg/ruler/storage.go @@ -1,11 +1,17 @@ package ruler import ( + "context" "flag" "fmt" + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/aws" + "github.com/cortexproject/cortex/pkg/chunk/azure" + "github.com/cortexproject/cortex/pkg/chunk/gcp" "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/ruler/rules" + "github.com/cortexproject/cortex/pkg/ruler/rules/objectclient" ) // RuleStoreConfig conigures a rule store @@ -13,13 +19,21 @@ type RuleStoreConfig struct { Type string `yaml:"type"` ConfigDB client.Config + // Object Storage Configs + Azure azure.BlobStorageConfig `yaml:"azure,omitempty"` + GCS gcp.GCSConfig `yaml:"gcs,omitempty"` + S3 aws.S3Config `yaml:"s3,omitempty"` + mock rules.RuleStore } // RegisterFlags registers flags. func (cfg *RuleStoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.ConfigDB.RegisterFlagsWithPrefix("ruler.", f) - f.StringVar(&cfg.Type, "ruler.storage.type", "configdb", "Method to use for backend rule storage (configdb)") + cfg.Azure.RegisterFlagsWithPrefix("ruler.storage.", f) + cfg.GCS.RegisterFlagsWithPrefix("ruler.storage.", f) + cfg.S3.RegisterFlagsWithPrefix("ruler.storage.", f) + f.StringVar(&cfg.Type, "ruler.storage.type", "configdb", "Method to use for backend rule storage (configdb, azure, gcs, s3)") } // NewRuleStorage returns a new rule storage backend poller and store @@ -37,7 +51,20 @@ func NewRuleStorage(cfg RuleStoreConfig) (rules.RuleStore, error) { } return rules.NewConfigRuleStore(c), nil + case "azure": + return newObjRuleStore(azure.NewBlobStorage(&cfg.Azure, "")) + case "gcs": + return newObjRuleStore(gcp.NewGCSObjectClient(context.Background(), cfg.GCS, "")) + case "s3": + return newObjRuleStore(aws.NewS3ObjectClient(cfg.S3, "")) default: - return nil, fmt.Errorf("Unrecognized rule storage mode %v, choose one of: configdb", cfg.Type) + return nil, fmt.Errorf("Unrecognized rule storage mode %v, choose one of: configdb, gcs", cfg.Type) + } +} + +func newObjRuleStore(client chunk.ObjectClient, err error) (rules.RuleStore, error) { + if err != nil { + return nil, err } + return objectclient.NewRuleStore(client), nil } diff --git a/pkg/ruler/store_mock_test.go b/pkg/ruler/store_mock_test.go index f1c81abed25..81cb47dafa7 100644 --- a/pkg/ruler/store_mock_test.go +++ b/pkg/ruler/store_mock_test.go @@ -58,3 +58,90 @@ func newMockRuleStore(rules map[string]rules.RuleGroupList) *mockRuleStore { func (m *mockRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rules.RuleGroupList, error) { return m.rules, nil } + +func (m *mockRuleStore) ListRuleGroups(ctx context.Context, userID, namespace string) (rules.RuleGroupList, error) { + userRules, exists := m.rules[userID] + if !exists { + return nil, rules.ErrUserNotFound + } + + if namespace == "" { + return userRules, nil + } + + namespaceRules := rules.RuleGroupList{} + + for _, rg := range userRules { + if rg.Namespace == namespace { + namespaceRules = append(namespaceRules, rg) + } + } + + if len(namespaceRules) == 0 { + return nil, rules.ErrGroupNamespaceNotFound + } + + return namespaceRules, nil +} + +func (m *mockRuleStore) GetRuleGroup(ctx context.Context, userID string, namespace string, group string) (*rules.RuleGroupDesc, error) { + userRules, exists := m.rules[userID] + if !exists { + return nil, rules.ErrUserNotFound + } + + if namespace == "" { + return nil, rules.ErrGroupNamespaceNotFound + } + + for _, rg := range userRules { + if rg.Namespace == namespace && rg.Name == group { + return rg, nil + } + } + + return nil, rules.ErrGroupNotFound +} + +func (m *mockRuleStore) SetRuleGroup(ctx context.Context, userID string, namespace string, group *rules.RuleGroupDesc) error { + userRules, exists := m.rules[userID] + if !exists { + userRules = rules.RuleGroupList{} + m.rules[userID] = userRules + } + + if namespace == "" { + return rules.ErrGroupNamespaceNotFound + } + + for i, rg := range userRules { + if rg.Namespace == namespace && rg.Name == group.Name { + userRules[i] = group + return nil + } + } + + m.rules[userID] = append(userRules, group) + return nil +} + +func (m *mockRuleStore) DeleteRuleGroup(ctx context.Context, userID string, namespace string, group string) error { + userRules, exists := m.rules[userID] + if !exists { + userRules = rules.RuleGroupList{} + m.rules[userID] = userRules + } + + if namespace == "" { + return rules.ErrGroupNamespaceNotFound + } + + for i, rg := range userRules { + if rg.Namespace == namespace && rg.Name == group { + m.rules[userID] = append(userRules[:i], userRules[:i+1]...) + return nil + } + } + + return nil +} From 5d19596ab232d6f1191af8df22858a17039c8e81 Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Thu, 12 Mar 2020 15:28:13 -0400 Subject: [PATCH 02/11] simplify and comment ruler storage api Signed-off-by: Jacob Lisi --- pkg/ruler/api.go | 136 ++++++++++++++++++++++------------------------- 1 file changed, 65 insertions(+), 71 deletions(-) diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index c199244ccbe..00ae4c3c7f5 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -291,6 +291,8 @@ var ( ErrNoRuleGroups = errors.New("no rule groups found") // ErrNoUserID is returned when no user ID is provided ErrNoUserID = errors.New("no id provided") + // ErrBadRuleGroup is returned when the provided rule group can not be unmarshalled + ErrBadRuleGroup = errors.New("unable to decoded rule group") ) // ValidateRuleGroup validates a rulegroup @@ -316,29 +318,58 @@ func ValidateRuleGroup(g rulefmt.RuleGroup) []error { return errs } -func (r *Ruler) listRules(w http.ResponseWriter, req *http.Request) { - logger := util.WithContext(req.Context(), util.Logger) - userID, _, err := user.ExtractOrgIDFromHTTPRequest(req) +// parseNamespace parses the namespace from the provided set of params, in this +// api these params are derived from the url path +func parseNamespace(params map[string]string) (string, error) { + namespace, exists := params["namespace"] + if !exists { + return "", ErrNoNamespace + } + + namespace, err := url.PathUnescape(namespace) // namespaces needs to be unescaped if in the URL if err != nil { - http.Error(w, err.Error(), http.StatusUnauthorized) - return + return "", err + } + + return namespace, nil +} + +// parseGroupName parses the group name from the provided set of params, in this +// api these params are derived from the url path +func parseGroupName(params map[string]string) (string, error) { + groupName, exists := params["groupName"] + if !exists { + return "", ErrNoGroupName + } + + groupName, err := url.PathUnescape(groupName) // groupName needs to be unescaped if in the URL + if err != nil { + return "", err } - if userID == "" { - http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized) + return groupName, nil +} + +func (r *Ruler) listRules(w http.ResponseWriter, req *http.Request) { + logger := util.WithContext(req.Context(), util.Logger) + userID, err := user.ExtractOrgID(req.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } vars := mux.Vars(req) - namespace := vars["namespace"] - if namespace != "" { - namespace, err = url.PathUnescape(namespace) // namespaces need to be unescaped if in the URL - if err != nil { + // Parse the namespace from the url path parameters and return + // a 400 if it is invalid and return a full set of rules if no + // namespace is provided + namespace, err := parseNamespace(vars) + if err != nil { + // If a namespace is not provided continue as usual and return a full set of rules + if err != ErrNoNamespace { http.Error(w, err.Error(), http.StatusBadRequest) return } - level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace) } @@ -376,38 +407,25 @@ func (r *Ruler) listRules(w http.ResponseWriter, req *http.Request) { func (r *Ruler) getRuleGroup(w http.ResponseWriter, req *http.Request) { logger := util.WithContext(req.Context(), util.Logger) - - userID, _, err := user.ExtractOrgIDFromHTTPRequest(req) + userID, err := user.ExtractOrgID(req.Context()) if err != nil { - http.Error(w, err.Error(), http.StatusUnauthorized) - return - } - - if userID == "" { - http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized) + http.Error(w, err.Error(), http.StatusBadRequest) return } vars := mux.Vars(req) - namespace, exists := vars["namespace"] - if !exists { - http.Error(w, ErrNoNamespace.Error(), http.StatusUnauthorized) - return - } - namespace, err = url.PathUnescape(namespace) // namespaces need to be unescaped if in the URL + // Parse the namespace from the url path parameters and return + // a 400 if it is invalid + namespace, err := parseNamespace(vars) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - groupName, exists := vars["groupName"] - if !exists { - http.Error(w, ErrNoGroupName.Error(), http.StatusUnauthorized) - return - } - - groupName, err = url.PathUnescape(groupName) // groupName need to be unescaped if in the URL + // Parse the rule group name from the url path parameters and return + // a 400 if it is invalid + groupName, err := parseGroupName(vars) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -442,27 +460,15 @@ func (r *Ruler) getRuleGroup(w http.ResponseWriter, req *http.Request) { func (r *Ruler) createRuleGroup(w http.ResponseWriter, req *http.Request) { logger := util.WithContext(req.Context(), util.Logger) - userID, _, err := user.ExtractOrgIDFromHTTPRequest(req) + userID, err := user.ExtractOrgID(req.Context()) if err != nil { - http.Error(w, err.Error(), http.StatusUnauthorized) - return - } - - if userID == "" { - http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized) - return - } - - vars := mux.Vars(req) - - namespace := vars["namespace"] - if namespace == "" { - level.Error(logger).Log("err", "no namespace provided with rule group") - http.Error(w, ErrNoNamespace.Error(), http.StatusBadRequest) + http.Error(w, err.Error(), http.StatusBadRequest) return } - namespace, err = url.PathUnescape(namespace) // namespaces need to be unescaped if in the URL + // Parse the namespace from the url path parameters and return + // a 400 if it is invalid + namespace, err := parseNamespace(mux.Vars(req)) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -481,7 +487,7 @@ func (r *Ruler) createRuleGroup(w http.ResponseWriter, req *http.Request) { err = yaml.Unmarshal(payload, &rg) if err != nil { level.Error(logger).Log("err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) + http.Error(w, ErrBadRuleGroup.Error(), http.StatusBadRequest) return } @@ -508,37 +514,25 @@ func (r *Ruler) createRuleGroup(w http.ResponseWriter, req *http.Request) { func (r *Ruler) deleteRuleGroup(w http.ResponseWriter, req *http.Request) { logger := util.WithContext(req.Context(), util.Logger) - userID, _, err := user.ExtractOrgIDFromHTTPRequest(req) + userID, err := user.ExtractOrgID(req.Context()) if err != nil { - http.Error(w, err.Error(), http.StatusUnauthorized) - return - } - - if userID == "" { - http.Error(w, ErrNoUserID.Error(), http.StatusUnauthorized) + http.Error(w, err.Error(), http.StatusBadRequest) return } vars := mux.Vars(req) - namespace, exists := vars["namespace"] - if !exists { - http.Error(w, ErrNoNamespace.Error(), http.StatusUnauthorized) - return - } - namespace, err = url.PathUnescape(namespace) + // Parse the namespace from the url path parameters and return + // a 400 if it is invalid + namespace, err := parseNamespace(vars) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - groupName, exists := vars["groupName"] - if !exists { - http.Error(w, ErrNoGroupName.Error(), http.StatusUnauthorized) - return - } - - groupName, err = url.PathUnescape(groupName) + // Parse the rule group name from the url path parameters and return + // a 400 if it is invalid + groupName, err := parseGroupName(vars) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return From 572f9b339cda819583020cce8a3abdf6e6bd9ac2 Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Thu, 12 Mar 2020 15:41:56 -0400 Subject: [PATCH 03/11] wrape calls to ruler with auth middleware Signed-off-by: Jacob Lisi --- pkg/cortex/modules.go | 2 +- pkg/ruler/api.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 4b113a1f8e2..861f52841e3 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -402,7 +402,7 @@ func (t *Cortex) initRuler(cfg *Config) (serv services.Service, err error) { if cfg.Ruler.EnableAPI { subrouter := t.server.HTTP.PathPrefix(cfg.HTTPPrefix).Subrouter() - t.ruler.RegisterRoutes(subrouter) + t.ruler.RegisterRoutes(subrouter, t.httpAuthMiddleware) ruler.RegisterRulerServer(t.server.GRPC, t.ruler) } diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index 00ae4c3c7f5..93568e20d9f 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/prometheus/pkg/labels" + "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" "gopkg.in/yaml.v2" @@ -26,7 +27,7 @@ import ( ) // RegisterRoutes registers the ruler API HTTP routes with the provided Router. -func (r *Ruler) RegisterRoutes(router *mux.Router) { +func (r *Ruler) RegisterRoutes(router *mux.Router, middleware middleware.Interface) { router = router.UseEncodedPath() for _, route := range []struct { name, method, path string @@ -41,7 +42,7 @@ func (r *Ruler) RegisterRoutes(router *mux.Router) { {"delete_rulegroup", "DELETE", "/rules/{namespace}/{groupName}", r.deleteRuleGroup}, } { level.Debug(util.Logger).Log("msg", "ruler: registering route", "name", route.name, "method", route.method, "path", route.path) - router.Handle(route.path, route.handler).Methods(route.method).Name(route.name) + router.Handle(route.path, middleware.Wrap(route.handler)).Methods(route.method).Name(route.name) } } From a5c00639fbe71e37db4e2e59a47301a39871daad Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Thu, 12 Mar 2020 15:50:52 -0400 Subject: [PATCH 04/11] use saner metrics for ruler integration test Signed-off-by: Jacob Lisi --- integration/api_ruler_test.go | 5 ++--- integration/configs.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/integration/api_ruler_test.go b/integration/api_ruler_test.go index bc73089e606..bb5092733ec 100644 --- a/integration/api_ruler_test.go +++ b/integration/api_ruler_test.go @@ -54,10 +54,9 @@ func TestRulerAPI(t *testing.T) { require.Equal(t, retrievedNamespace[0].Name, rg.Name) // Ensure the rule group is loaded by the per-tenant Prometheus rules manager - require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1), "cortex_cortex_ruler_managers_total")) + require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1), "cortex_ruler_managers_total")) require.NoError(t, c.DeleteRuleGroup(namespace, rg.Name)) - - require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(2), "cortex_ruler_config_updates_total")) + require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(0), "cortex_ruler_managers_total")) _, err = c.GetRuleGroups() require.Error(t, err) diff --git a/integration/configs.go b/integration/configs.go index 0037e281e57..c852393c075 100644 --- a/integration/configs.go +++ b/integration/configs.go @@ -52,7 +52,7 @@ var ( RulerConfigs = map[string]string{ "-ruler.enable-sharding": "false", - "-ruler.poll-interval": "5s", + "-ruler.poll-interval": "2s", "-experimental.ruler.enable-api": "true", "-ruler.storage.type": "s3", "-ruler.storage.s3.buckets": "cortex-rules", From 7d3469000b9487e7fd87741e5ab3c772705fc601 Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Thu, 12 Mar 2020 16:03:35 -0400 Subject: [PATCH 05/11] remove omitempty from configs Signed-off-by: Jacob Lisi --- pkg/ruler/storage.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/ruler/storage.go b/pkg/ruler/storage.go index 5262312608d..6bface18117 100644 --- a/pkg/ruler/storage.go +++ b/pkg/ruler/storage.go @@ -16,15 +16,15 @@ import ( // RuleStoreConfig conigures a rule store type RuleStoreConfig struct { - Type string `yaml:"type"` - ConfigDB client.Config + Type string `yaml:"type"` + ConfigDB client.Config `yaml:"configdb"` // Object Storage Configs - Azure azure.BlobStorageConfig `yaml:"azure,omitempty"` - GCS gcp.GCSConfig `yaml:"gcs,omitempty"` - S3 aws.S3Config `yaml:"s3,omitempty"` + Azure azure.BlobStorageConfig `yaml:"azure"` + GCS gcp.GCSConfig `yaml:"gcs"` + S3 aws.S3Config `yaml:"s3"` - mock rules.RuleStore + mock rules.RuleStore `yaml:"-"` } // RegisterFlags registers flags. From 42fa09c3ed26a129a30faded36692f8d326af95b Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Thu, 12 Mar 2020 16:07:34 -0400 Subject: [PATCH 06/11] add clarifying comments for ruler integration tests and reorder metric checks Signed-off-by: Jacob Lisi --- integration/api_ruler_test.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/integration/api_ruler_test.go b/integration/api_ruler_test.go index bb5092733ec..965fba1492a 100644 --- a/integration/api_ruler_test.go +++ b/integration/api_ruler_test.go @@ -28,9 +28,11 @@ func TestRulerAPI(t *testing.T) { ruler := e2ecortex.NewRuler("ruler", mergeFlags(ChunksStorageFlags, RulerConfigs), "") require.NoError(t, s.StartAndWaitReady(ruler)) + // Create a client with the ruler address configured c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), "user-1") require.NoError(t, err) + // Create example namespace and rule group to use for tests namespace := "test_namespace" rg := rulefmt.RuleGroup{ Name: "test_group", @@ -43,8 +45,13 @@ func TestRulerAPI(t *testing.T) { }, } + // Set the rule group into the ruler require.NoError(t, c.SetRuleGroup(rg, namespace)) + // Wait until the user manager is created + require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1), "cortex_ruler_managers_total")) + + // Check to ensure the rules running in the ruler match what was set rgs, err := c.GetRuleGroups() require.NoError(t, err) @@ -53,11 +60,13 @@ func TestRulerAPI(t *testing.T) { require.Len(t, retrievedNamespace, 1) require.Equal(t, retrievedNamespace[0].Name, rg.Name) - // Ensure the rule group is loaded by the per-tenant Prometheus rules manager - require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1), "cortex_ruler_managers_total")) + // Delete the set rule group require.NoError(t, c.DeleteRuleGroup(namespace, rg.Name)) + + // Wait until the users manager has been terminated require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(0), "cortex_ruler_managers_total")) + // Check to ensure the rule groups are no longer active _, err = c.GetRuleGroups() require.Error(t, err) } From 5cbecc765524c866dda15ca81a658e889c7e25ca Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Thu, 12 Mar 2020 16:11:47 -0400 Subject: [PATCH 07/11] update changelog Signed-off-by: Jacob Lisi --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 685122a0119..d16fb3ecb7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ * [CHANGE] Utilize separate protos for rule state and storage. Experimental ruler API will not be functional until the rollout is complete. #2226 * [CHANGE] Frontend worker in querier now starts after all Querier module dependencies are started. This fixes issue where frontend worker started to send queries to querier before it was ready to serve them (mostly visible when using experimental blocks storage). #2246 * [CHANGE] Lifecycler component now enters Failed state on errors, and doesn't exit the process. (Important if you're vendoring Cortex and use Lifecycler) #2251 +* [FEATURE] Added experimental storage API to the ruler service that is enabled when the `-experimental.ruler.enable-api` is set to true #2269 + * `-ruler.storage.type` flag now allows `s3`,`gcs`, and `azure` values + * `-ruler.storage.(s3|gcs|azure)` flags exist to allow the configuration of object clients set for rule storage * [FEATURE] Flusher target to flush the WAL. * `-flusher.wal-dir` for the WAL directory to recover from. * `-flusher.concurrent-flushes` for number of concurrent flushes. From fda86bf3788f6067915e06ae189ac8cc6bcc391a Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Thu, 12 Mar 2020 16:14:01 -0400 Subject: [PATCH 08/11] update docs Signed-off-by: Jacob Lisi --- docs/configuration/config-file-reference.md | 75 ++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 01f1f890542..3a6771f87d1 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -708,7 +708,7 @@ The `ruler_config` configures the Cortex ruler. [pollinterval: | default = 1m0s] storeconfig: - # Method to use for backend rule storage (configdb) + # Method to use for backend rule storage (configdb, azure, gcs, s3) # CLI flag: -ruler.storage.type [type: | default = "configdb"] @@ -717,6 +717,79 @@ storeconfig: # The CLI flags prefix for this block config is: ruler [configdb: ] + azure: + # Name of the blob container used to store chunks. Defaults to `cortex`. + # This container must be created before running cortex. + # CLI flag: -ruler.storage.azure.container-name + [container_name: | default = "cortex"] + + # The Microsoft Azure account name to be used + # CLI flag: -ruler.storage.azure.account-name + [account_name: | default = ""] + + # The Microsoft Azure account key to use. + # CLI flag: -ruler.storage.azure.account-key + [account_key: | default = ""] + + # Preallocated buffer size for downloads (default is 512KB) + # CLI flag: -ruler.storage.azure.download-buffer-size + [download_buffer_size: | default = 512000] + + # Preallocated buffer size for up;oads (default is 256KB) + # CLI flag: -ruler.storage.azure.upload-buffer-size + [upload_buffer_size: | default = 256000] + + # Number of buffers used to used to upload a chunk. (defaults to 1) + # CLI flag: -ruler.storage.azure.download-buffer-count + [upload_buffer_count: | default = 1] + + # Timeout for requests made against azure blob storage. Defaults to 30 + # seconds. + # CLI flag: -ruler.storage.azure.request-timeout + [request_timeout: | default = 30s] + + # Number of retries for a request which times out. + # CLI flag: -ruler.storage.azure.max-retries + [max_retries: | default = 5] + + # Minimum time to wait before retrying a request. + # CLI flag: -ruler.storage.azure.min-retry-delay + [min_retry_delay: | default = 10ms] + + # Maximum time to wait before retrying a request. + # CLI flag: -ruler.storage.azure.max-retry-delay + [max_retry_delay: | default = 500ms] + + gcs: + # Name of GCS bucket to put chunks in. + # CLI flag: -ruler.storage.gcs.bucketname + [bucket_name: | default = ""] + + # The size of the buffer that GCS client for each PUT request. 0 to disable + # buffering. + # CLI flag: -ruler.storage.gcs.chunk-buffer-size + [chunk_buffer_size: | default = 0] + + # The duration after which the requests to GCS should be timed out. + # CLI flag: -ruler.storage.gcs.request-timeout + [request_timeout: | default = 0s] + + s3: + # S3 endpoint URL with escaped Key and Secret encoded. If only region is + # specified as a host, proper endpoint will be deduced. Use + # inmemory:/// to use a mock in-memory implementation. + # CLI flag: -ruler.storage.s3.url + [s3: | default = ] + + # Comma separated list of bucket names to evenly distribute chunks over. + # Overrides any buckets specified in s3.url flag + # CLI flag: -ruler.storage.s3.buckets + [bucketnames: | default = ""] + + # Set this to `true` to force the request to use path-style addressing. + # CLI flag: -ruler.storage.s3.force-path-style + [s3forcepathstyle: | default = false] + # file path to store temporary rule files for the prometheus rule managers # CLI flag: -ruler.rule-path [rulepath: | default = "/rules"] From f1d626f92c8dd2a6b95b24d32747c5629859f14b Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Wed, 18 Mar 2020 12:03:33 -0400 Subject: [PATCH 09/11] Update pkg/ruler/api.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Peter Štibraný Signed-off-by: Jacob Lisi --- pkg/ruler/api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index 93568e20d9f..d5b5aec0597 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -287,7 +287,7 @@ var ( // ErrNoNamespace signals the requested namespace does not exist ErrNoNamespace = errors.New("a namespace must be provided in the url") // ErrNoGroupName signals a group name url parameter was not found - ErrNoGroupName = errors.New("a matching group name must be provided in the url") + ErrNoGroupName = errors.New("a matching group name must be provided in the request") // ErrNoRuleGroups signals the rule group requested does not exist ErrNoRuleGroups = errors.New("no rule groups found") // ErrNoUserID is returned when no user ID is provided From 5d88330ab21a8aa0f4d6f967340682c5646c9770 Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Wed, 18 Mar 2020 13:34:22 -0400 Subject: [PATCH 10/11] refactor per PR comments Signed-off-by: Jacob Lisi --- integration/api_ruler_test.go | 7 +- integration/e2ecortex/client.go | 5 +- pkg/ruler/api.go | 179 +++++++++++++------------------- 3 files changed, 81 insertions(+), 110 deletions(-) diff --git a/integration/api_ruler_test.go b/integration/api_ruler_test.go index 965fba1492a..b1f44a8c3c0 100644 --- a/integration/api_ruler_test.go +++ b/integration/api_ruler_test.go @@ -32,10 +32,11 @@ func TestRulerAPI(t *testing.T) { c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), "user-1") require.NoError(t, err) - // Create example namespace and rule group to use for tests - namespace := "test_namespace" + // Create example namespace and rule group to use for tests, using strings that + // require url escaping. + namespace := "test_encoded_+namespace?" rg := rulefmt.RuleGroup{ - Name: "test_group", + Name: "test_encoded_+\"+group_name?", Interval: 100, Rules: []rulefmt.Rule{ rulefmt.Rule{ diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index a10242297a5..b16c0aac591 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -7,6 +7,7 @@ import ( "fmt" "io/ioutil" "net/http" + "net/url" "time" "github.com/gogo/protobuf/proto" @@ -199,7 +200,7 @@ func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) err } // Create HTTP request - req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/rules/%s", c.rulerAddress, namespace), bytes.NewReader(data)) + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/rules/%s", c.rulerAddress, url.PathEscape(namespace)), bytes.NewReader(data)) if err != nil { return err } @@ -223,7 +224,7 @@ func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) err // DeleteRuleGroup gets the status of an alertmanager instance func (c *Client) DeleteRuleGroup(namespace string, groupName string) error { // Create HTTP request - req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/api/prom/rules/%s/%s", c.rulerAddress, namespace, groupName), nil) + req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/api/prom/rules/%s/%s", c.rulerAddress, url.PathEscape(namespace), url.PathEscape(groupName)), nil) if err != nil { return err } diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index d5b5aec0597..e96a3d63977 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -28,6 +28,8 @@ import ( // RegisterRoutes registers the ruler API HTTP routes with the provided Router. func (r *Ruler) RegisterRoutes(router *mux.Router, middleware middleware.Interface) { + // Routes for this API must be encoded to allow for various characters to be + // present in the path URL router = router.UseEncodedPath() for _, route := range []struct { name, method, path string @@ -284,14 +286,12 @@ func (r *Ruler) alerts(w http.ResponseWriter, req *http.Request) { } var ( - // ErrNoNamespace signals the requested namespace does not exist - ErrNoNamespace = errors.New("a namespace must be provided in the url") + // ErrNoNamespace signals that no namespace was specified in the request + ErrNoNamespace = errors.New("a namespace must be provided in the request") // ErrNoGroupName signals a group name url parameter was not found ErrNoGroupName = errors.New("a matching group name must be provided in the request") // ErrNoRuleGroups signals the rule group requested does not exist ErrNoRuleGroups = errors.New("no rule groups found") - // ErrNoUserID is returned when no user ID is provided - ErrNoUserID = errors.New("no id provided") // ErrBadRuleGroup is returned when the provided rule group can not be unmarshalled ErrBadRuleGroup = errors.New("unable to decoded rule group") ) @@ -319,6 +319,39 @@ func ValidateRuleGroup(g rulefmt.RuleGroup) []error { return errs } +func marshalAndSend(formatted interface{}, w http.ResponseWriter, logger log.Logger) { + d, err := yaml.Marshal(&formatted) + if err != nil { + level.Error(logger).Log("msg", "error marshalling yaml rule groups", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/yaml") + if _, err := w.Write(d); err != nil { + level.Error(logger).Log("msg", "error writing yaml response", "err", err) + return + } +} + +func respondAccepted(w http.ResponseWriter, logger log.Logger) { + b, err := json.Marshal(&response{ + Status: "success", + }) + if err != nil { + level.Error(logger).Log("msg", "error marshaling json response", "err", err) + respondError(logger, w, "unable to marshal the requested data") + return + } + w.Header().Set("Content-Type", "application/json") + + // Return a status accepted because the rule has been stored and queued for polling, but is not currently active + w.WriteHeader(http.StatusAccepted) + if n, err := w.Write(b); err != nil { + level.Error(logger).Log("msg", "error writing response", "bytesWritten", n, "err", err) + } +} + // parseNamespace parses the namespace from the provided set of params, in this // api these params are derived from the url path func parseNamespace(params map[string]string) (string, error) { @@ -327,7 +360,7 @@ func parseNamespace(params map[string]string) (string, error) { return "", ErrNoNamespace } - namespace, err := url.PathUnescape(namespace) // namespaces needs to be unescaped if in the URL + namespace, err := url.PathUnescape(namespace) if err != nil { return "", err } @@ -343,7 +376,7 @@ func parseGroupName(params map[string]string) (string, error) { return "", ErrNoGroupName } - groupName, err := url.PathUnescape(groupName) // groupName needs to be unescaped if in the URL + groupName, err := url.PathUnescape(groupName) if err != nil { return "", err } @@ -351,30 +384,40 @@ func parseGroupName(params map[string]string) (string, error) { return groupName, nil } -func (r *Ruler) listRules(w http.ResponseWriter, req *http.Request) { - logger := util.WithContext(req.Context(), util.Logger) - userID, err := user.ExtractOrgID(req.Context()) +func parseRequest(req *http.Request, requireNamespace, requireGroup bool) (string, string, string, error) { + id, err := user.ExtractOrgID(req.Context()) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return + return "", "", "", user.ErrNoOrgID } vars := mux.Vars(req) - // Parse the namespace from the url path parameters and return - // a 400 if it is invalid and return a full set of rules if no - // namespace is provided namespace, err := parseNamespace(vars) if err != nil { - // If a namespace is not provided continue as usual and return a full set of rules - if err != ErrNoNamespace { - http.Error(w, err.Error(), http.StatusBadRequest) - return + if err != ErrNoNamespace || requireNamespace { + return "", "", "", err } - level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace) } - level.Debug(logger).Log("msg", "retrieving rule groups from rule store", "userID", userID) + group, err := parseGroupName(vars) + if err != nil { + if err != ErrNoGroupName || requireGroup { + return "", "", "", err + } + } + + return id, namespace, group, nil +} + +func (r *Ruler) listRules(w http.ResponseWriter, req *http.Request) { + logger := util.WithContext(req.Context(), util.Logger) + + userID, namespace, _, err := parseRequest(req, false, false) + if err != nil { + respondError(logger, w, err.Error()) + } + + level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace) rgs, err := r.store.ListRuleGroups(req.Context(), userID, namespace) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -390,46 +433,14 @@ func (r *Ruler) listRules(w http.ResponseWriter, req *http.Request) { } formatted := rgs.Formatted() - - d, err := yaml.Marshal(&formatted) - if err != nil { - level.Error(logger).Log("msg", "error marshalling yaml rule groups", "err", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/yaml") - if _, err := w.Write(d); err != nil { - level.Error(logger).Log("msg", "error writing yaml response", "err", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + marshalAndSend(formatted, w, logger) } func (r *Ruler) getRuleGroup(w http.ResponseWriter, req *http.Request) { logger := util.WithContext(req.Context(), util.Logger) - userID, err := user.ExtractOrgID(req.Context()) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - vars := mux.Vars(req) - - // Parse the namespace from the url path parameters and return - // a 400 if it is invalid - namespace, err := parseNamespace(vars) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - // Parse the rule group name from the url path parameters and return - // a 400 if it is invalid - groupName, err := parseGroupName(vars) + userID, namespace, groupName, err := parseRequest(req, true, true) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return + respondError(logger, w, err.Error()) } rg, err := r.store.GetRuleGroup(req.Context(), userID, namespace, groupName) @@ -442,37 +453,15 @@ func (r *Ruler) getRuleGroup(w http.ResponseWriter, req *http.Request) { return } - formattedRG := store.FromProto(rg) - - d, err := yaml.Marshal(&formattedRG) - if err != nil { - level.Error(logger).Log("msg", "error marshalling yaml rule groups", "err", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/yaml") - if _, err := w.Write(d); err != nil { - level.Error(logger).Log("msg", "error writing yaml response", "err", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + formatted := store.FromProto(rg) + marshalAndSend(formatted, w, logger) } func (r *Ruler) createRuleGroup(w http.ResponseWriter, req *http.Request) { logger := util.WithContext(req.Context(), util.Logger) - userID, err := user.ExtractOrgID(req.Context()) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - // Parse the namespace from the url path parameters and return - // a 400 if it is invalid - namespace, err := parseNamespace(mux.Vars(req)) + userID, namespace, _, err := parseRequest(req, true, false) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return + respondError(logger, w, err.Error()) } payload, err := ioutil.ReadAll(req.Body) @@ -509,34 +498,15 @@ func (r *Ruler) createRuleGroup(w http.ResponseWriter, req *http.Request) { return } - // Return a status accepted because the rule has been stored and queued for polling, but is not currently active - w.WriteHeader(http.StatusAccepted) + respondAccepted(w, logger) } func (r *Ruler) deleteRuleGroup(w http.ResponseWriter, req *http.Request) { logger := util.WithContext(req.Context(), util.Logger) - userID, err := user.ExtractOrgID(req.Context()) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - vars := mux.Vars(req) - - // Parse the namespace from the url path parameters and return - // a 400 if it is invalid - namespace, err := parseNamespace(vars) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - // Parse the rule group name from the url path parameters and return - // a 400 if it is invalid - groupName, err := parseGroupName(vars) + userID, namespace, groupName, err := parseRequest(req, true, true) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return + respondError(logger, w, err.Error()) } err = r.store.DeleteRuleGroup(req.Context(), userID, namespace, groupName) @@ -550,6 +520,5 @@ func (r *Ruler) deleteRuleGroup(w http.ResponseWriter, req *http.Request) { return } - // Return a status accepted because the rule has been stored and queued for polling, but is not currently active - w.WriteHeader(http.StatusAccepted) + respondAccepted(w, logger) } From 0876dd651b9626de1f32aa80a0e0ebffeef626cf Mon Sep 17 00:00:00 2001 From: Jacob Lisi Date: Thu, 19 Mar 2020 11:16:15 -0400 Subject: [PATCH 11/11] fix API per PR comments Signed-off-by: Jacob Lisi --- pkg/ruler/api.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index e96a3d63977..bf521e41cee 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -319,8 +319,8 @@ func ValidateRuleGroup(g rulefmt.RuleGroup) []error { return errs } -func marshalAndSend(formatted interface{}, w http.ResponseWriter, logger log.Logger) { - d, err := yaml.Marshal(&formatted) +func marshalAndSend(output interface{}, w http.ResponseWriter, logger log.Logger) { + d, err := yaml.Marshal(&output) if err != nil { level.Error(logger).Log("msg", "error marshalling yaml rule groups", "err", err) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -384,8 +384,11 @@ func parseGroupName(params map[string]string) (string, error) { return groupName, nil } +// parseRequest parses the incoming request to parse out the userID, rules namespace, and rule group name +// and returns them in that order. It also allows users to require a namespace or group name and return +// an error if it they can not be parsed. func parseRequest(req *http.Request, requireNamespace, requireGroup bool) (string, string, string, error) { - id, err := user.ExtractOrgID(req.Context()) + userID, err := user.ExtractOrgID(req.Context()) if err != nil { return "", "", "", user.ErrNoOrgID } @@ -406,7 +409,7 @@ func parseRequest(req *http.Request, requireNamespace, requireGroup bool) (strin } } - return id, namespace, group, nil + return userID, namespace, group, nil } func (r *Ruler) listRules(w http.ResponseWriter, req *http.Request) { @@ -415,6 +418,7 @@ func (r *Ruler) listRules(w http.ResponseWriter, req *http.Request) { userID, namespace, _, err := parseRequest(req, false, false) if err != nil { respondError(logger, w, err.Error()) + return } level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace) @@ -441,6 +445,7 @@ func (r *Ruler) getRuleGroup(w http.ResponseWriter, req *http.Request) { userID, namespace, groupName, err := parseRequest(req, true, true) if err != nil { respondError(logger, w, err.Error()) + return } rg, err := r.store.GetRuleGroup(req.Context(), userID, namespace, groupName) @@ -462,11 +467,12 @@ func (r *Ruler) createRuleGroup(w http.ResponseWriter, req *http.Request) { userID, namespace, _, err := parseRequest(req, true, false) if err != nil { respondError(logger, w, err.Error()) + return } payload, err := ioutil.ReadAll(req.Body) if err != nil { - level.Error(logger).Log("err", err.Error()) + level.Error(logger).Log("msg", "unable to read rule group payload", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -476,14 +482,16 @@ func (r *Ruler) createRuleGroup(w http.ResponseWriter, req *http.Request) { rg := rulefmt.RuleGroup{} err = yaml.Unmarshal(payload, &rg) if err != nil { - level.Error(logger).Log("err", err.Error()) + level.Error(logger).Log("msg", "unable to unmarshal rule group payload", "err", err.Error()) http.Error(w, ErrBadRuleGroup.Error(), http.StatusBadRequest) return } errs := ValidateRuleGroup(rg) if len(errs) > 0 { - level.Error(logger).Log("err", err.Error()) + for _, err := range errs { + level.Error(logger).Log("msg", "unable to validate rule group payload", "err", err.Error()) + } http.Error(w, errs[0].Error(), http.StatusBadRequest) return } @@ -493,7 +501,7 @@ func (r *Ruler) createRuleGroup(w http.ResponseWriter, req *http.Request) { level.Debug(logger).Log("msg", "attempting to store rulegroup", "userID", userID, "group", rgProto.String()) err = r.store.SetRuleGroup(req.Context(), userID, namespace, rgProto) if err != nil { - level.Error(logger).Log("err", err.Error()) + level.Error(logger).Log("msg", "unable to store rule group", "err", err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -507,6 +515,7 @@ func (r *Ruler) deleteRuleGroup(w http.ResponseWriter, req *http.Request) { userID, namespace, groupName, err := parseRequest(req, true, true) if err != nil { respondError(logger, w, err.Error()) + return } err = r.store.DeleteRuleGroup(req.Context(), userID, namespace, groupName) @@ -515,8 +524,7 @@ func (r *Ruler) deleteRuleGroup(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusNotFound) return } - level.Error(logger).Log("err", err.Error()) - http.Error(w, err.Error(), http.StatusInternalServerError) + respondError(logger, w, err.Error()) return }