diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ee9b95e2..5ac8c599 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -129,7 +129,7 @@ jobs: - name: Setup sqlc uses: sqlc-dev/setup-sqlc@v4 with: - sqlc-version: "1.27.0" + sqlc-version: "1.28.0" - name: Run sqlc diff run: | diff --git a/.storybook/preview.tsx b/.storybook/preview.tsx index ce92d4d0..37bb1353 100644 --- a/.storybook/preview.tsx +++ b/.storybook/preview.tsx @@ -1,5 +1,4 @@ -import type { Preview } from "@storybook/react"; -import type { PartialStoryFn, StoryContext } from "@storybook/types"; +import type { Decorator, Preview } from "@storybook/react"; import { withThemeByClassName } from "@storybook/addon-themes"; import { ReactRenderer } from "@storybook/react"; @@ -13,27 +12,52 @@ import { import { ThemeProvider } from "next-themes"; import React from "react"; +import type { Features } from "../src/services/features"; + import "../src/global-type-overrides"; import "../src/index.css"; +import { FeaturesContext } from "../src/contexts/Features"; + +/** + * Decorator that provides feature flags to stories + * Can be overridden per story using parameters.features + */ +export const withFeatures: Decorator = (StoryFn, context) => { + // Default features with story-specific overrides + const features = { + hasProducerTable: true, + ...context.parameters?.features, + }; + + return ( + + + + ); +}; -function withRouter(Story: PartialStoryFn, { parameters }: StoryContext) { +/** + * Decorator that provides router context for stories + * Can be configured per story using parameters.router + */ +export const withRouter: Decorator = (StoryFn, context) => { const { initialEntries = ["/"], initialIndex, routes = ["/"], - } = parameters?.router || {}; + } = context.parameters?.router || {}; + // Create a router instance only when needed const rootRoute = createRootRoute(); - - const children = routes.map((path) => + const routeComponents = routes.map((path) => createRoute({ - component: Story, + component: () => , getParentRoute: () => rootRoute, path, }), ); - rootRoute.addChildren(children); + rootRoute.addChildren(routeComponents); const router = createRouter({ history: createMemoryHistory({ initialEntries, initialIndex }), @@ -41,10 +65,21 @@ function withRouter(Story: PartialStoryFn, { parameters }: StoryContext) { }); return ; -} +}; + +/** + * Decorator for theme provider + */ +export const withThemeProvider: Decorator = (StoryFn) => ( + + + +); -declare module "@storybook/types" { +// Define parameter types +declare module "@storybook/react" { interface Parameters { + features?: Partial; router?: { initialEntries?: string[]; initialIndex?: number; @@ -55,6 +90,7 @@ declare module "@storybook/types" { const preview: Preview = { decorators: [ + withFeatures, withRouter, withThemeByClassName({ defaultTheme: "light", @@ -63,11 +99,7 @@ const preview: Preview = { light: "light", }, }), - (Story) => ( - - - - ), + withThemeProvider, ], parameters: { diff --git a/CHANGELOG.md b/CHANGELOG.md index 20be15b0..9908b8a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added a queue detail page with the ability to view queue stats. For River Pro customers, this page offers the ability to dynamically override concurrency limits and to view individual clients for each queue, along with how many jobs each is working. [PR #326](https://github.com/riverqueue/riverui/pull/326). + ## [v0.8.1] - 2025-02-27 ### Changed diff --git a/common_test.go b/common_test.go index efc533eb..e39e0ccf 100644 --- a/common_test.go +++ b/common_test.go @@ -62,3 +62,15 @@ func requireAPIError[TError error](t *testing.T, expectedErr TError, err error) require.ErrorAs(t, err, &apiErr) require.Equal(t, expectedErr, apiErr) } + +const producerSchema = `CREATE UNLOGGED TABLE IF NOT EXISTS river_producer ( + id bigserial PRIMARY KEY, + client_id text NOT NULL, + queue_name text NOT NULL, + + max_workers int NOT NULL CHECK (max_workers >= 0), + metadata jsonb NOT NULL DEFAULT '{}', + paused_at timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +);` diff --git a/go.mod b/go.mod index fedff26d..2621c27f 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,12 @@ toolchain go1.24.1 require ( github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.4 - github.com/riverqueue/apiframe v0.0.0-20250310152721-45007400f5bf - github.com/riverqueue/river v0.20.1 - github.com/riverqueue/river/riverdriver v0.20.1 - github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.1 - github.com/riverqueue/river/rivershared v0.20.1 - github.com/riverqueue/river/rivertype v0.20.1 + github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4 + github.com/riverqueue/river v0.20.2 + github.com/riverqueue/river/riverdriver v0.20.2 + github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.2 + github.com/riverqueue/river/rivershared v0.20.2 + github.com/riverqueue/river/rivertype v0.20.2 github.com/rs/cors v1.11.1 github.com/samber/slog-http v1.6.0 github.com/stretchr/testify v1.10.0 @@ -39,9 +39,9 @@ require ( go.uber.org/goleak v1.3.0 // indirect golang.org/x/crypto v0.35.0 // indirect golang.org/x/net v0.36.0 // indirect - golang.org/x/sync v0.12.0 // indirect + golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.30.0 // indirect - golang.org/x/text v0.23.0 // indirect + golang.org/x/text v0.24.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4cde23b8..b8c99d69 100644 --- a/go.sum +++ b/go.sum @@ -35,20 +35,20 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/riverqueue/apiframe v0.0.0-20250310152721-45007400f5bf h1:y0ZXBnVCUuqKNhld/VVIix5pYVPjzOZu3J48wDpatSU= -github.com/riverqueue/apiframe v0.0.0-20250310152721-45007400f5bf/go.mod h1:ko/9b4SeomWrHTr4WU0i21peq90Qk2Mm8MgOqPrTcHA= -github.com/riverqueue/river v0.20.1 h1:eKf4gbPJF632LLoEPIMMEnP9I79aWWDb9k1avUHXfIA= -github.com/riverqueue/river v0.20.1/go.mod h1:1RVre4dwkRznCZSgz1NeW9HVqeV2MFcRbpi89rvIaYE= -github.com/riverqueue/river/riverdriver v0.20.1 h1:Iz5DXbHFrt32iFv0DRpk2Td1HcyR2z4VrhC9CA9dsoI= -github.com/riverqueue/river/riverdriver v0.20.1/go.mod h1:Q8MbNY6uuQEtozC/dLJ2HRenCZrEQn2K5V1/yYHoK9I= -github.com/riverqueue/river/riverdriver/riverdatabasesql v0.20.1 h1:C5XxNpZ365YGYv+nUIbSZynyVW+hPBo7CggsE8S3eIw= -github.com/riverqueue/river/riverdriver/riverdatabasesql v0.20.1/go.mod h1:IxJ4+ZTqlMVrA1rcbLuiSwg4qlXfyiRnZnmoz+phbNg= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.1 h1:66ZntyF9i1HsIpPMXO8urhie1hPcqBbz0R31CPWgTXM= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.1/go.mod h1:CJ6LYk3q0s/nUVzadLXQIpUDHi0hhPg9a8GAzqSq9P8= -github.com/riverqueue/river/rivershared v0.20.1 h1:49EKGZ1jtT6kgsoX5jX9+Cr/v8NB2xZAAUVvE6Q0lQg= -github.com/riverqueue/river/rivershared v0.20.1/go.mod h1:M2j13k2UlimNtU2z7iYJEoY7x0Zvp2T+q1pW/qoWzaQ= -github.com/riverqueue/river/rivertype v0.20.1 h1:9kx3vyfYm5Cn3MZLqfmCwwhpPqE10zCBXAL6UstmbY4= -github.com/riverqueue/river/rivertype v0.20.1/go.mod h1:lmdl3vLNDfchDWbYdW2uAocIuwIN+ZaXqAukdSCFqWs= +github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4 h1:ejJogJ57bF+jMbvGjZQ6H6LR0NCTDQr30SJ/wSVepgs= +github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4/go.mod h1:6aXA9FSXKkxwjbOUSXdrIOuw478Lvtz/eEu45R4MoQk= +github.com/riverqueue/river v0.20.2 h1:GU34ZcC6B3TUCJf7G9sOSURKzgHZf1Vxd3RJCxbsX68= +github.com/riverqueue/river v0.20.2/go.mod h1:xbycGcRu2+RpoVm4hWQA6Ed7Ef6riFu3xJEZx3nHNHQ= +github.com/riverqueue/river/riverdriver v0.20.2 h1:FDmWALB6DvYBBw479euIBg1KClxPmDpWjmZbhScxSBw= +github.com/riverqueue/river/riverdriver v0.20.2/go.mod h1:vYSv6ZTEFWT0JVuGCwZDxJdc2U7ZMkwJQ+nPsa7/2mM= +github.com/riverqueue/river/riverdriver/riverdatabasesql v0.20.2 h1:llBsU1hpKyIIzZroeVjM7uavmq3W+kXuSvkUCQ/3pg4= +github.com/riverqueue/river/riverdriver/riverdatabasesql v0.20.2/go.mod h1:qPJ5qkfAqAYRKXxU1TNFsVwMd9dLIXEFDLrrGz6GAWM= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.2 h1:O8e1vobbKhUmgbki0mLOvCptixMtBiMjJgkGPa4VFAY= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.2/go.mod h1:zn3Lf6qzkq9kEOzYRe/fEgYl9c/eRTCdwBHtclxILEU= +github.com/riverqueue/river/rivershared v0.20.2 h1:mrZV66L7PQyR+y0o7JMsZbdT+aG3SAVRQ7AB58mGbxU= +github.com/riverqueue/river/rivershared v0.20.2/go.mod h1:8B1yIue4a/Qb5efwo9qpbTEnYCQhZAa9NZn6pdM381o= +github.com/riverqueue/river/rivertype v0.20.2 h1:unmiQP7CWS6IDbDrp9cESNscPoMstxb6Luoz9kfNzOc= +github.com/riverqueue/river/rivertype v0.20.2/go.mod h1:lmdl3vLNDfchDWbYdW2uAocIuwIN+ZaXqAukdSCFqWs= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= @@ -82,12 +82,12 @@ golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA= golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I= -golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= -golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= +golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/handler.go b/handler.go index 55d6357b..39fa43f0 100644 --- a/handler.go +++ b/handler.go @@ -19,6 +19,7 @@ import ( "github.com/riverqueue/apiframe/apiendpoint" "github.com/riverqueue/apiframe/apimiddleware" + "github.com/riverqueue/apiframe/apitype" "github.com/riverqueue/river" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/startstop" @@ -136,20 +137,28 @@ func NewServer(opts *ServerOpts) (*Server, error) { mux := http.NewServeMux() + mountOpts := apiendpoint.MountOpts{ + Logger: opts.Logger, + Validator: apitype.NewValidator(), + } + endpoints := []apiendpoint.EndpointInterface{ - apiendpoint.Mount(mux, opts.Logger, newHealthCheckGetEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newJobCancelEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newJobDeleteEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newJobListEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newJobRetryEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newJobGetEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newQueueGetEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newQueueListEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newQueuePauseEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newQueueResumeEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newStateAndCountGetEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newWorkflowGetEndpoint(apiBundle)), - apiendpoint.Mount(mux, opts.Logger, newWorkflowListEndpoint(apiBundle)), + apiendpoint.Mount(mux, newFeaturesGetEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newHealthCheckGetEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newJobCancelEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newJobDeleteEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newJobGetEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newJobListEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newJobRetryEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newProducerListEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newQueueGetEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newQueueListEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newQueuePauseEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newQueueResumeEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newQueueUpdateEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newStateAndCountGetEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newWorkflowGetEndpoint(apiBundle), &mountOpts), + apiendpoint.Mount(mux, newWorkflowListEndpoint(apiBundle), &mountOpts), } var services []startstop.Service diff --git a/handler_api_endpoint.go b/handler_api_endpoint.go index a4358c0d..298f61f9 100644 --- a/handler_api_endpoint.go +++ b/handler_api_endpoint.go @@ -7,6 +7,7 @@ import ( "fmt" "log/slog" "net/http" + "slices" "strconv" "time" @@ -15,6 +16,7 @@ import ( "github.com/riverqueue/apiframe/apiendpoint" "github.com/riverqueue/apiframe/apierror" + "github.com/riverqueue/apiframe/apitype" "github.com/riverqueue/river" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/startstop" @@ -54,6 +56,65 @@ type statusResponse struct { var statusResponseOK = &statusResponse{Status: "ok"} //nolint:gochecknoglobals +// +// featuresGetEndpoint +// + +type featuresGetEndpoint struct { + apiBundle + apiendpoint.Endpoint[featuresGetRequest, featuresGetResponse] +} + +func newFeaturesGetEndpoint(apiBundle apiBundle) *featuresGetEndpoint { + return &featuresGetEndpoint{apiBundle: apiBundle} +} + +func (*featuresGetEndpoint) Meta() *apiendpoint.EndpointMeta { + return &apiendpoint.EndpointMeta{ + Pattern: "GET /api/features", + StatusCode: http.StatusOK, + } +} + +type featuresGetRequest struct{} + +type featuresGetResponse struct { + HasClientTable bool `json:"has_client_table"` + HasProducerTable bool `json:"has_producer_table"` + HasWorkflows bool `json:"has_workflows"` +} + +func (a *featuresGetEndpoint) Execute(ctx context.Context, _ *featuresGetRequest) (*featuresGetResponse, error) { + hasClientTable, err := dbsqlc.New().TableExistsInCurrentSchema(ctx, a.dbPool, "river_client") + if err != nil { + return nil, err + } + + hasProducerTable, err := dbsqlc.New().TableExistsInCurrentSchema(ctx, a.dbPool, "river_producer") + if err != nil { + return nil, err + } + + indexResults, err := dbsqlc.New().IndexesExistInCurrentSchema(ctx, a.dbPool, []string{ + "river_job_workflow_list_active", + "river_job_workflow_scheduling", + }) + if err != nil { + return nil, err + } + + indexResultsMap := make(map[string]bool, len(indexResults)) + for _, indexResult := range indexResults { + indexResultsMap[indexResult.IndexNamesIndexName] = indexResult.Exists + } + + return &featuresGetResponse{ + HasClientTable: hasClientTable, + HasProducerTable: hasProducerTable, + HasWorkflows: indexResultsMap["river_job_workflow_list_active"] || indexResultsMap["river_job_workflow_scheduling"], + }, nil +} + // // healthCheckGetEndpoint // @@ -354,6 +415,49 @@ func (a *jobRetryEndpoint) Execute(ctx context.Context, req *jobRetryRequest) (* }) } +// +// producerListEndpoint +// + +type producerListEndpoint struct { + apiBundle + apiendpoint.Endpoint[jobCancelRequest, listResponse[RiverProducer]] +} + +func newProducerListEndpoint(apiBundle apiBundle) *producerListEndpoint { + return &producerListEndpoint{apiBundle: apiBundle} +} + +func (*producerListEndpoint) Meta() *apiendpoint.EndpointMeta { + return &apiendpoint.EndpointMeta{ + Pattern: "GET /api/producers", + StatusCode: http.StatusOK, + } +} + +type producerListRequest struct { + QueueName string `json:"-" validate:"required"` // from ExtractRaw +} + +func (req *producerListRequest) ExtractRaw(r *http.Request) error { + if queueName := r.URL.Query().Get("queue_name"); queueName != "" { + req.QueueName = queueName + } + + return nil +} + +func (a *producerListEndpoint) Execute(ctx context.Context, req *producerListRequest) (*listResponse[RiverProducer], error) { + return pgxutil.WithTxV(ctx, a.dbPool, func(ctx context.Context, tx pgx.Tx) (*listResponse[RiverProducer], error) { + result, err := dbsqlc.New().ProducerListByQueue(ctx, tx, req.QueueName) + if err != nil { + return nil, fmt.Errorf("error listing producers: %w", err) + } + + return listResponseFrom(sliceutil.Map(result, internalProducerToSerializableProducer)), nil + }) +} + // // queueGetEndpoint // @@ -541,6 +645,75 @@ func (a *queueResumeEndpoint) Execute(ctx context.Context, req *queueResumeReque }) } +type queueUpdateEndpoint struct { + apiBundle + apiendpoint.Endpoint[queueUpdateRequest, RiverQueue] +} + +func newQueueUpdateEndpoint(apiBundle apiBundle) *queueUpdateEndpoint { + return &queueUpdateEndpoint{apiBundle: apiBundle} +} + +func (*queueUpdateEndpoint) Meta() *apiendpoint.EndpointMeta { + return &apiendpoint.EndpointMeta{ + Pattern: "PATCH /api/queues/{name}", + StatusCode: http.StatusOK, + } +} + +type queueUpdateRequest struct { + Concurrency apitype.ExplicitNullable[ConcurrencyConfig] `json:"concurrency"` + Name string `json:"-" validate:"required"` // from ExtractRaw +} + +func (req *queueUpdateRequest) ExtractRaw(r *http.Request) error { + req.Name = r.PathValue("name") + return nil +} + +func (a *queueUpdateEndpoint) Execute(ctx context.Context, req *queueUpdateRequest) (*RiverQueue, error) { + return pgxutil.WithTxV(ctx, a.dbPool, func(ctx context.Context, tx pgx.Tx) (*RiverQueue, error) { + // Construct metadata based on concurrency field + var metadata json.RawMessage + if req.Concurrency.Set { + if req.Concurrency.Value == nil { + // If concurrency is nil, clear the metadata + metadata = []byte("{}") + } else { + // Ensure consistent sorting of ByArgs: + slices.Sort(req.Concurrency.Value.Partition.ByArgs) + + // Otherwise, construct metadata with the concurrency config + metadataStruct := map[string]interface{}{ + "concurrency": req.Concurrency.Value, + } + var err error + metadata, err = json.Marshal(metadataStruct) + if err != nil { + return nil, fmt.Errorf("error marshaling metadata: %w", err) + } + } + } + + queue, err := a.client.QueueUpdateTx(ctx, tx, req.Name, &river.QueueUpdateParams{ + Metadata: metadata, + }) + if err != nil { + if errors.Is(err, river.ErrNotFound) { + return nil, NewNotFoundQueue(req.Name) + } + return nil, fmt.Errorf("error updating queue metadata: %w", err) + } + + countRows, err := dbsqlc.New().JobCountByQueueAndState(ctx, a.dbPool, []string{req.Name}) + if err != nil { + return nil, fmt.Errorf("error getting queue counts: %w", err) + } + + return riverQueueToSerializableQueue(*queue, countRows[0]), nil + }) +} + // // stateAndCountGetEndpoint // @@ -770,6 +943,17 @@ func NewNotFoundWorkflow(id string) *apierror.NotFound { return apierror.NewNotFoundf("Workflow not found: %s.", id) } +type ConcurrencyConfig struct { + GlobalLimit int32 `json:"global_limit"` + LocalLimit int32 `json:"local_limit"` + Partition PartitionConfig `json:"partition"` +} + +type PartitionConfig struct { + ByArgs []string `json:"by_args"` + ByKind bool `json:"by_kind"` +} + type RiverJob struct { ID int64 `json:"id"` Args json.RawMessage `json:"args"` @@ -853,22 +1037,73 @@ func riverJobToSerializableJob(riverJob *rivertype.JobRow) *RiverJob { } } +type RiverProducer struct { + ID int64 `json:"id"` + ClientID string `json:"client_id"` + Concurrency *ConcurrencyConfig `json:"concurrency"` + CreatedAt time.Time `json:"created_at"` + MaxWorkers int `json:"max_workers"` + PausedAt *time.Time `json:"paused_at"` + QueueName string `json:"queue_name"` + Running int32 `json:"running"` + UpdatedAt time.Time `json:"updated_at"` +} + +func internalProducerToSerializableProducer(internal *dbsqlc.ProducerListByQueueRow) *RiverProducer { + var pausedAt *time.Time + if internal.RiverProducer.PausedAt.Valid { + pausedAt = &internal.RiverProducer.PausedAt.Time + } + + var concurrency *ConcurrencyConfig + if len(internal.RiverProducer.Metadata) > 0 { + var metadata struct { + Concurrency ConcurrencyConfig `json:"concurrency"` + } + if err := json.Unmarshal(internal.RiverProducer.Metadata, &metadata); err == nil { + concurrency = &metadata.Concurrency + } + } + + return &RiverProducer{ + ID: internal.RiverProducer.ID, + ClientID: internal.RiverProducer.ClientID, + Concurrency: concurrency, + CreatedAt: internal.RiverProducer.CreatedAt.Time, + MaxWorkers: int(internal.RiverProducer.MaxWorkers), + PausedAt: pausedAt, + QueueName: internal.RiverProducer.QueueName, + Running: internal.Running, + UpdatedAt: internal.RiverProducer.UpdatedAt.Time, + } +} + type RiverQueue struct { - CountAvailable int `json:"count_available"` - CountRunning int `json:"count_running"` - CreatedAt time.Time `json:"created_at"` - Metadata json.RawMessage `json:"metadata"` - Name string `json:"name"` - PausedAt *time.Time `json:"paused_at"` - UpdatedAt time.Time `json:"updated_at"` + CountAvailable int `json:"count_available"` + CountRunning int `json:"count_running"` + CreatedAt time.Time `json:"created_at"` + Concurrency *ConcurrencyConfig `json:"concurrency"` + Name string `json:"name"` + PausedAt *time.Time `json:"paused_at"` + UpdatedAt time.Time `json:"updated_at"` } func riverQueueToSerializableQueue(internal rivertype.Queue, count *dbsqlc.JobCountByQueueAndStateRow) *RiverQueue { + var concurrency *ConcurrencyConfig + if len(internal.Metadata) > 0 { + var metadata struct { + Concurrency *ConcurrencyConfig `json:"concurrency"` + } + if err := json.Unmarshal(internal.Metadata, &metadata); err == nil { + concurrency = metadata.Concurrency + } + } + return &RiverQueue{ CountAvailable: int(count.CountAvailable), CountRunning: int(count.CountRunning), CreatedAt: internal.CreatedAt, - Metadata: internal.Metadata, + Concurrency: concurrency, Name: internal.Name, PausedAt: internal.PausedAt, UpdatedAt: internal.UpdatedAt, diff --git a/handler_api_endpoint_test.go b/handler_api_endpoint_test.go index 5fa27cbf..bff84338 100644 --- a/handler_api_endpoint_test.go +++ b/handler_api_endpoint_test.go @@ -10,8 +10,10 @@ import ( "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" + "github.com/riverqueue/apiframe/apiendpoint" "github.com/riverqueue/apiframe/apierror" "github.com/riverqueue/apiframe/apitest" + "github.com/riverqueue/apiframe/apitype" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/riversharedtest" @@ -59,7 +61,62 @@ func setupEndpoint[TEndpoint any](ctx context.Context, t *testing.T, initFunc fu } } -func TestHandlerHealthCheckGetEndpoint(t *testing.T) { +func testMountOpts(t *testing.T) *apiendpoint.MountOpts { + t.Helper() + return &apiendpoint.MountOpts{ + Logger: riverinternaltest.Logger(t), + Validator: apitype.NewValidator(), + } +} + +func TestAPIHandlerFeaturesGet(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + ctx := context.Background() + + t.Run("SuccessWithEverythingFalse", func(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + endpoint, bundle := setupEndpoint(ctx, t, newFeaturesGetEndpoint) + + _, err := bundle.tx.Exec(ctx, `DROP TABLE IF EXISTS river_producer;`) + require.NoError(t, err) + _, err = bundle.tx.Exec(ctx, `DROP TABLE IF EXISTS river_client CASCADE;`) + require.NoError(t, err) + _, err = bundle.tx.Exec(ctx, `DROP INDEX IF EXISTS river_job_workflow_list_active;`) + require.NoError(t, err) + _, err = bundle.tx.Exec(ctx, `DROP INDEX IF EXISTS river_job_workflow_scheduling;`) + require.NoError(t, err) + + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &featuresGetRequest{}) + require.NoError(t, err) + require.Equal(t, &featuresGetResponse{ + HasClientTable: false, + HasProducerTable: false, + HasWorkflows: false, + }, resp) + }) + + t.Run("SuccessWithEverythingTrue", func(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + endpoint, bundle := setupEndpoint(ctx, t, newFeaturesGetEndpoint) + + _, err := bundle.tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS river_client (id SERIAL PRIMARY KEY);`) + require.NoError(t, err) + _, err = bundle.tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS river_producer (id SERIAL PRIMARY KEY);`) + require.NoError(t, err) + _, err = bundle.tx.Exec(ctx, `CREATE INDEX IF NOT EXISTS river_job_workflow_list_active ON river_job ((metadata->>'workflow_id'));`) + require.NoError(t, err) + + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &featuresGetRequest{}) + require.NoError(t, err) + require.Equal(t, &featuresGetResponse{ + HasClientTable: true, + HasProducerTable: true, + HasWorkflows: true, + }, resp) + }) +} + +func TestAPIHandlerHealthCheckGet(t *testing.T) { t.Parallel() ctx := context.Background() @@ -69,7 +126,7 @@ func TestHandlerHealthCheckGetEndpoint(t *testing.T) { endpoint, _ := setupEndpoint(ctx, t, newHealthCheckGetEndpoint) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &healthCheckGetRequest{Name: healthCheckNameComplete}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &healthCheckGetRequest{Name: healthCheckNameComplete}) require.NoError(t, err) require.Equal(t, statusResponseOK, resp) }) @@ -82,7 +139,7 @@ func TestHandlerHealthCheckGetEndpoint(t *testing.T) { // Roll back prematurely so we get a database error. require.NoError(t, bundle.tx.Rollback(ctx)) - _, err := apitest.InvokeHandler(ctx, endpoint.Execute, &healthCheckGetRequest{Name: healthCheckNameComplete}) + _, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &healthCheckGetRequest{Name: healthCheckNameComplete}) requireAPIError(t, apierror.WithInternalError( apierror.NewServiceUnavailable("Unable to query database. Check logs for details."), pgx.ErrTxClosed, @@ -94,7 +151,7 @@ func TestHandlerHealthCheckGetEndpoint(t *testing.T) { endpoint, _ := setupEndpoint(ctx, t, newHealthCheckGetEndpoint) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &healthCheckGetRequest{Name: healthCheckNameMinimal}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &healthCheckGetRequest{Name: healthCheckNameMinimal}) require.NoError(t, err) require.Equal(t, statusResponseOK, resp) }) @@ -104,12 +161,12 @@ func TestHandlerHealthCheckGetEndpoint(t *testing.T) { endpoint, _ := setupEndpoint(ctx, t, newHealthCheckGetEndpoint) - _, err := apitest.InvokeHandler(ctx, endpoint.Execute, &healthCheckGetRequest{Name: "other"}) + _, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &healthCheckGetRequest{Name: "other"}) requireAPIError(t, apierror.NewNotFoundf("Health check %q not found. Use either `complete` or `minimal`.", "other"), err) }) } -func TestJobCancelEndpoint(t *testing.T) { +func TestAPIHandlerJobCancel(t *testing.T) { t.Parallel() ctx := context.Background() @@ -122,7 +179,7 @@ func TestJobCancelEndpoint(t *testing.T) { job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{}) job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{}) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobCancelRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobCancelRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}}) require.NoError(t, err) require.Equal(t, statusResponseOK, resp) @@ -140,12 +197,12 @@ func TestJobCancelEndpoint(t *testing.T) { endpoint, _ := setupEndpoint(ctx, t, newJobCancelEndpoint) - _, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobCancelRequest{JobIDs: []int64String{123}}) + _, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobCancelRequest{JobIDs: []int64String{123}}) requireAPIError(t, NewNotFoundJob(123), err) }) } -func TestJobDeleteEndpoint(t *testing.T) { +func TestAPIHandlerJobDelete(t *testing.T) { t.Parallel() ctx := context.Background() @@ -158,7 +215,7 @@ func TestJobDeleteEndpoint(t *testing.T) { job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{}) job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{}) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobDeleteRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobDeleteRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}}) require.NoError(t, err) require.Equal(t, statusResponseOK, resp) @@ -174,12 +231,12 @@ func TestJobDeleteEndpoint(t *testing.T) { endpoint, _ := setupEndpoint(ctx, t, newJobDeleteEndpoint) - _, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobDeleteRequest{JobIDs: []int64String{123}}) + _, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobDeleteRequest{JobIDs: []int64String{123}}) requireAPIError(t, NewNotFoundJob(123), err) }) } -func TestJobGetEndpoint(t *testing.T) { +func TestAPIHandlerJobGet(t *testing.T) { t.Parallel() ctx := context.Background() @@ -191,7 +248,7 @@ func TestJobGetEndpoint(t *testing.T) { job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{}) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobGetRequest{JobID: job.ID}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobGetRequest{JobID: job.ID}) require.NoError(t, err) require.Equal(t, job.ID, resp.ID) }) @@ -201,7 +258,7 @@ func TestJobGetEndpoint(t *testing.T) { endpoint, _ := setupEndpoint(ctx, t, newJobGetEndpoint) - _, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobGetRequest{JobID: 123}) + _, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobGetRequest{JobID: 123}) requireAPIError(t, NewNotFoundJob(123), err) }) } @@ -227,7 +284,7 @@ func TestAPIHandlerJobList(t *testing.T) { _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStatePending)}) _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)}) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobListRequest{}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobListRequest{}) require.NoError(t, err) require.Len(t, resp.Data, 2) require.Equal(t, job1.ID, resp.Data[0].ID) @@ -242,7 +299,7 @@ func TestAPIHandlerJobList(t *testing.T) { job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{}) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobListRequest{Limit: ptrutil.Ptr(1)}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobListRequest{Limit: ptrutil.Ptr(1)}) require.NoError(t, err) require.Len(t, resp.Data, 1) require.Equal(t, job1.ID, resp.Data[0].ID) @@ -259,7 +316,7 @@ func TestAPIHandlerJobList(t *testing.T) { // Other states excluded. _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobListRequest{State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobListRequest{State: ptrutil.Ptr(rivertype.JobStateCompleted)}) require.NoError(t, err) require.Len(t, resp.Data, 2) require.Equal(t, job2.ID, resp.Data[0].ID) // order inverted @@ -277,7 +334,7 @@ func TestAPIHandlerJobList(t *testing.T) { // Other states excluded. _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)}) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobListRequest{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobListRequest{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) require.NoError(t, err) require.Len(t, resp.Data, 2) require.Equal(t, job1.ID, resp.Data[0].ID) @@ -285,7 +342,7 @@ func TestAPIHandlerJobList(t *testing.T) { }) } -func TestJobRetryEndpoint(t *testing.T) { +func TestAPIHandlerJobRetry(t *testing.T) { t.Parallel() ctx := context.Background() @@ -304,7 +361,7 @@ func TestJobRetryEndpoint(t *testing.T) { State: ptrutil.Ptr(rivertype.JobStateDiscarded), }) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobRetryRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobRetryRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}}) require.NoError(t, err) require.Equal(t, statusResponseOK, resp) @@ -322,11 +379,60 @@ func TestJobRetryEndpoint(t *testing.T) { endpoint, _ := setupEndpoint(ctx, t, newJobRetryEndpoint) - _, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobRetryRequest{JobIDs: []int64String{123}}) + _, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &jobRetryRequest{JobIDs: []int64String{123}}) requireAPIError(t, NewNotFoundJob(123), err) }) } +func TestAPIHandlerProducerList(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("Success", func(t *testing.T) { + t.Parallel() + + endpoint, bundle := setupEndpoint(ctx, t, newProducerListEndpoint) + + _, err := bundle.tx.Exec(ctx, producerSchema) + require.NoError(t, err) + + _, err = bundle.tx.Exec(ctx, `INSERT INTO river_producer (client_id, queue_name, max_workers, metadata) VALUES ('client1', 'queue1', 1, '{}');`) + require.NoError(t, err) + + _, err = bundle.tx.Exec(ctx, `INSERT INTO river_producer (client_id, queue_name, max_workers, metadata) VALUES ('client2', 'queue1', 2, '{}');`) + require.NoError(t, err) + + _, err = bundle.tx.Exec(ctx, + `INSERT INTO river_producer (client_id, queue_name, max_workers, metadata) VALUES ( + 'client2', 'queue2', 3, '{"concurrency": {"running": {"abcd": {"count": 3}, "efgh": {"count": 2}}}}' + );`, + ) + require.NoError(t, err) + + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &producerListRequest{QueueName: "queue1"}) + require.NoError(t, err) + require.Equal(t, 2, len(resp.Data)) + require.Equal(t, "client1", resp.Data[0].ClientID) + require.Equal(t, 1, resp.Data[0].MaxWorkers) + require.Equal(t, int32(0), resp.Data[0].Running) + require.Equal(t, "client2", resp.Data[1].ClientID) + require.Equal(t, 2, resp.Data[1].MaxWorkers) + require.Equal(t, int32(0), resp.Data[1].Running) + + resp, err = apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &producerListRequest{QueueName: "queue2"}) + require.NoError(t, err) + require.Equal(t, 1, len(resp.Data)) + require.Equal(t, "client2", resp.Data[0].ClientID) + require.Equal(t, 3, resp.Data[0].MaxWorkers) + require.Equal(t, int32(5), resp.Data[0].Running) + + resp, err = apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &producerListRequest{QueueName: "queue3"}) + require.NoError(t, err) + require.Equal(t, 0, len(resp.Data)) + }) +} + func TestAPIHandlerQueueGet(t *testing.T) { t.Parallel() @@ -342,7 +448,7 @@ func TestAPIHandlerQueueGet(t *testing.T) { _, err := bundle.client.InsertTx(ctx, bundle.tx, &noOpArgs{}, &river.InsertOpts{Queue: queue.Name}) require.NoError(t, err) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueGetRequest{Name: queue.Name}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &queueGetRequest{Name: queue.Name}) require.NoError(t, err) require.Equal(t, 1, resp.CountAvailable) require.Equal(t, queue.Name, resp.Name) @@ -353,7 +459,7 @@ func TestAPIHandlerQueueGet(t *testing.T) { endpoint, _ := setupEndpoint(ctx, t, newQueueGetEndpoint) - _, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueGetRequest{Name: "does_not_exist"}) + _, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &queueGetRequest{Name: "does_not_exist"}) requireAPIError(t, NewNotFoundQueue("does_not_exist"), err) }) } @@ -374,7 +480,7 @@ func TestAPIHandlerQueueList(t *testing.T) { _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: &queue1.Name}) _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: &queue2.Name}) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueListRequest{}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &queueListRequest{}) require.NoError(t, err) require.Len(t, resp.Data, 2) require.Equal(t, 1, resp.Data[0].CountAvailable) @@ -391,7 +497,7 @@ func TestAPIHandlerQueueList(t *testing.T) { queue1 := testfactory.Queue(ctx, t, bundle.exec, nil) _ = testfactory.Queue(ctx, t, bundle.exec, nil) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueListRequest{Limit: ptrutil.Ptr(1)}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &queueListRequest{Limit: ptrutil.Ptr(1)}) require.NoError(t, err) require.Len(t, resp.Data, 1) require.Equal(t, queue1.Name, resp.Data[0].Name) @@ -410,7 +516,7 @@ func TestAPIHandlerQueuePause(t *testing.T) { queue := testfactory.Queue(ctx, t, bundle.exec, nil) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queuePauseRequest{Name: queue.Name}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &queuePauseRequest{Name: queue.Name}) require.NoError(t, err) require.Equal(t, statusResponseOK, resp) }) @@ -420,7 +526,7 @@ func TestAPIHandlerQueuePause(t *testing.T) { endpoint, _ := setupEndpoint(ctx, t, newQueuePauseEndpoint) - _, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queuePauseRequest{Name: "does_not_exist"}) + _, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &queuePauseRequest{Name: "does_not_exist"}) requireAPIError(t, NewNotFoundQueue("does_not_exist"), err) }) } @@ -439,7 +545,7 @@ func TestAPIHandlerQueueResume(t *testing.T) { PausedAt: ptrutil.Ptr(time.Now()), }) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueResumeRequest{Name: queue.Name}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &queueResumeRequest{Name: queue.Name}) require.NoError(t, err) require.Equal(t, statusResponseOK, resp) }) @@ -449,7 +555,79 @@ func TestAPIHandlerQueueResume(t *testing.T) { endpoint, _ := setupEndpoint(ctx, t, newQueueResumeEndpoint) - _, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueResumeRequest{Name: "does_not_exist"}) + _, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &queueResumeRequest{Name: "does_not_exist"}) + requireAPIError(t, NewNotFoundQueue("does_not_exist"), err) + }) +} + +func TestAPIHandlerQueueUpdate(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("Success", func(t *testing.T) { + t.Parallel() + + endpoint, bundle := setupEndpoint(ctx, t, newQueueUpdateEndpoint) + + queue := testfactory.Queue(ctx, t, bundle.exec, nil) + + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &queueUpdateRequest{ + Name: queue.Name, + Concurrency: apitype.ExplicitNullable[ConcurrencyConfig]{ + Set: true, + Value: &ConcurrencyConfig{GlobalLimit: 10, LocalLimit: 5}, + }, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, queue.Name, resp.Name) + require.Equal(t, &ConcurrencyConfig{ + GlobalLimit: 10, + LocalLimit: 5, + }, resp.Concurrency) + }) + + t.Run("SortsPartitionByArgs", func(t *testing.T) { + t.Parallel() + + endpoint, bundle := setupEndpoint(ctx, t, newQueueUpdateEndpoint) + + queue := testfactory.Queue(ctx, t, bundle.exec, nil) + + // Create unsorted ByArgs array + unsortedArgs := []string{"z", "c", "a", "b"} + sortedArgs := []string{"a", "b", "c", "z"} // same array but sorted + + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &queueUpdateRequest{ + Name: queue.Name, + Concurrency: apitype.ExplicitNullable[ConcurrencyConfig]{ + Set: true, + Value: &ConcurrencyConfig{ + GlobalLimit: 10, + LocalLimit: 5, + Partition: PartitionConfig{ + ByArgs: unsortedArgs, + ByKind: true, + }, + }, + }, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, queue.Name, resp.Name) + require.NotNil(t, resp.Concurrency) + require.Equal(t, sortedArgs, resp.Concurrency.Partition.ByArgs) + }) + + t.Run("NotFound", func(t *testing.T) { + t.Parallel() + + endpoint, _ := setupEndpoint(ctx, t, newQueueUpdateEndpoint) + + _, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &queueUpdateRequest{ + Name: "does_not_exist", + }) requireAPIError(t, NewNotFoundQueue("does_not_exist"), err) }) } @@ -494,7 +672,7 @@ func TestStateAndCountGetEndpoint(t *testing.T) { _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)}) } - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &stateAndCountGetRequest{}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &stateAndCountGetRequest{}) require.NoError(t, err) require.Equal(t, &stateAndCountGetResponse{ Available: 1, @@ -521,7 +699,7 @@ func TestStateAndCountGetEndpoint(t *testing.T) { _, err := endpoint.queryCacher.RunQuery(ctx) require.NoError(t, err) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &stateAndCountGetRequest{}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &stateAndCountGetRequest{}) require.NoError(t, err) require.Equal(t, &stateAndCountGetResponse{ Available: queryCacheSkipThreshold + 1, @@ -541,7 +719,7 @@ func TestStateAndCountGetEndpoint(t *testing.T) { _, err := endpoint.queryCacher.RunQuery(ctx) require.NoError(t, err) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &stateAndCountGetRequest{}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &stateAndCountGetRequest{}) require.NoError(t, err) require.Equal(t, &stateAndCountGetResponse{ Available: queryCacheSkipThreshold - 1, @@ -563,7 +741,7 @@ func TestAPIHandlerWorkflowGet(t *testing.T) { job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Metadata: mustMarshalJSON(t, map[string]uuid.UUID{"workflow_id": workflowID})}) job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Metadata: mustMarshalJSON(t, map[string]uuid.UUID{"workflow_id": workflowID})}) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowGetRequest{ID: workflowID.String()}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &workflowGetRequest{ID: workflowID.String()}) require.NoError(t, err) require.Len(t, resp.Tasks, 2) require.Equal(t, job1.ID, resp.Tasks[0].ID) @@ -577,7 +755,7 @@ func TestAPIHandlerWorkflowGet(t *testing.T) { workflowID := uuid.New() - _, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowGetRequest{ID: workflowID.String()}) + _, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &workflowGetRequest{ID: workflowID.String()}) requireAPIError(t, NewNotFoundWorkflow(workflowID.String()), err) }) } @@ -608,7 +786,7 @@ func TestAPIHandlerWorkflowList(t *testing.T) { }) t.Run("All", func(t *testing.T) { - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowListRequest{}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &workflowListRequest{}) require.NoError(t, err) require.Len(t, resp.Data, 2) require.Equal(t, 1, resp.Data[0].CountCancelled) @@ -630,7 +808,7 @@ func TestAPIHandlerWorkflowList(t *testing.T) { }) t.Run("Active", func(t *testing.T) { - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowListRequest{State: "active"}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &workflowListRequest{State: "active"}) require.NoError(t, err) require.Len(t, resp.Data, 1) require.Equal(t, 0, resp.Data[0].CountAvailable) @@ -647,7 +825,7 @@ func TestAPIHandlerWorkflowList(t *testing.T) { }) t.Run("Inactive", func(t *testing.T) { - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowListRequest{State: "inactive"}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &workflowListRequest{State: "inactive"}) require.NoError(t, err) require.Len(t, resp.Data, 1) require.Equal(t, 1, resp.Data[0].CountCompleted) @@ -672,7 +850,7 @@ func TestAPIHandlerWorkflowList(t *testing.T) { State: ptrutil.Ptr(rivertype.JobStateScheduled), }) - resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowListRequest{Limit: ptrutil.Ptr(1)}) + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &workflowListRequest{Limit: ptrutil.Ptr(1)}) require.NoError(t, err) require.Len(t, resp.Data, 1) require.Equal(t, "2", resp.Data[0].ID) // DESC order means last one gets returned diff --git a/handler_test.go b/handler_test.go index bac5439b..89b0dcf0 100644 --- a/handler_test.go +++ b/handler_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/require" + "github.com/riverqueue/apiframe/apitype" "github.com/riverqueue/river/rivershared/util/ptrutil" "riverqueue.com/riverui/internal/riverinternaltest" @@ -88,6 +89,9 @@ func TestNewHandlerIntegration(t *testing.T) { // Test data // + _, err := tx.Exec(ctx, producerSchema) // producer table for related endpoints + require.NoError(t, err) + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{}) queue := testfactory.Queue(ctx, t, exec, nil) @@ -102,6 +106,7 @@ func TestNewHandlerIntegration(t *testing.T) { // API calls // + makeAPICall(t, "FeaturesGet", http.MethodGet, makeURL("/api/features"), nil) makeAPICall(t, "HealthCheckGetComplete", http.MethodGet, makeURL("/api/health-checks/%s", healthCheckNameComplete), nil) makeAPICall(t, "HealthCheckGetMinimal", http.MethodGet, makeURL("/api/health-checks/%s", healthCheckNameMinimal), nil) makeAPICall(t, "JobCancel", http.MethodPost, makeURL("/api/jobs/cancel"), mustMarshalJSON(t, &jobCancelRequest{JobIDs: []int64String{int64String(job.ID)}})) @@ -109,9 +114,21 @@ func TestNewHandlerIntegration(t *testing.T) { makeAPICall(t, "JobGet", http.MethodGet, makeURL("/api/jobs/%d", job.ID), nil) makeAPICall(t, "JobList", http.MethodGet, makeURL("/api/jobs"), nil) makeAPICall(t, "JobRetry", http.MethodPost, makeURL("/api/jobs/retry"), mustMarshalJSON(t, &jobCancelRequest{JobIDs: []int64String{int64String(job.ID)}})) + makeAPICall(t, "ProducerList", http.MethodGet, makeURL("/api/producers?queue_name=%s", queue.Name), nil) makeAPICall(t, "QueueGet", http.MethodGet, makeURL("/api/queues/%s", queue.Name), nil) makeAPICall(t, "QueueList", http.MethodGet, makeURL("/api/queues"), nil) makeAPICall(t, "QueuePause", http.MethodPut, makeURL("/api/queues/%s/pause", queue.Name), nil) + makeAPICall(t, "QueueUpdate", http.MethodPatch, makeURL("/api/queues/%s", queue.Name), mustMarshalJSON(t, &queueUpdateRequest{ + Concurrency: apitype.ExplicitNullable[ConcurrencyConfig]{ + Value: &ConcurrencyConfig{ + GlobalLimit: 10, + LocalLimit: 5, + Partition: PartitionConfig{ + ByArgs: []string{"customer_id"}, + ByKind: true, + }, + }, + }})) makeAPICall(t, "QueueResume", http.MethodPut, makeURL("/api/queues/%s/resume", queuePaused.Name), nil) makeAPICall(t, "StateAndCountGet", http.MethodGet, makeURL("/api/states"), nil) makeAPICall(t, "WorkflowGet", http.MethodGet, makeURL("/api/workflows/%s", workflowID), nil) diff --git a/internal/dbsqlc/db.go b/internal/dbsqlc/db.go index 582411a0..6b5ac0c5 100644 --- a/internal/dbsqlc/db.go +++ b/internal/dbsqlc/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.27.0 +// sqlc v1.28.0 package dbsqlc diff --git a/internal/dbsqlc/migrations/20230906143347_create_river_job.sql b/internal/dbsqlc/migrations/20230906143347_create_river_job.sql deleted file mode 100644 index 0f182621..00000000 --- a/internal/dbsqlc/migrations/20230906143347_create_river_job.sql +++ /dev/null @@ -1,109 +0,0 @@ --- +goose Up -CREATE TYPE river_job_state AS ENUM( - 'available', - 'cancelled', - 'completed', - 'discarded', - 'retryable', - 'running', - 'scheduled' -); - -CREATE TABLE river_job( - -- 8 bytes - id bigserial PRIMARY KEY, - - -- 8 bytes (4 bytes + 2 bytes + 2 bytes) - -- - -- `state` is kept near the top of the table for operator convenience -- when - -- looking at jobs with `SELECT *` it'll appear first after ID. The other two - -- fields aren't as important but are kept adjacent to `state` for alignment - -- to get an 8-byte block. - state river_job_state NOT NULL DEFAULT 'available' ::river_job_state, - attempt smallint NOT NULL DEFAULT 0, - max_attempts smallint NOT NULL, - - -- 8 bytes each (no alignment needed) - attempted_at timestamptz, - created_at timestamptz NOT NULL DEFAULT NOW(), - finalized_at timestamptz, - scheduled_at timestamptz NOT NULL DEFAULT NOW(), - - -- 2 bytes (some wasted padding probably) - priority smallint NOT NULL DEFAULT 1, - - -- types stored out-of-band - args jsonb, - attempted_by text[], - errors jsonb[], - kind text NOT NULL, - metadata jsonb NOT NULL DEFAULT '{}' ::jsonb, - queue text NOT NULL DEFAULT 'default' ::text, - tags varchar(255)[], - - CONSTRAINT finalized_or_finalized_at_null CHECK ((state IN ('cancelled', 'completed', 'discarded') AND finalized_at IS NOT NULL) OR finalized_at IS NULL), - CONSTRAINT max_attempts_is_positive CHECK (max_attempts > 0), - CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4), - CONSTRAINT queue_length CHECK (char_length(queue) > 0 AND char_length(queue) < 128), - CONSTRAINT kind_length CHECK (char_length(kind) > 0 AND char_length(kind) < 128) -); - --- We may want to consider adding another property here after `kind` if it seems --- like it'd be useful for something. -CREATE INDEX river_job_kind ON river_job USING btree(kind); - -CREATE INDEX river_job_state_and_finalized_at_index ON river_job USING btree(state, finalized_at) WHERE finalized_at IS NOT NULL; - -CREATE INDEX river_job_prioritized_fetching_index ON river_job USING btree(state, queue, priority, scheduled_at, id); - -CREATE INDEX river_job_args_index ON river_job USING GIN(args); - -CREATE INDEX river_job_metadata_index ON river_job USING GIN(metadata); - --- +goose StatementBegin -CREATE OR REPLACE FUNCTION river_job_notify() - RETURNS TRIGGER - AS $$ -DECLARE - payload json; -BEGIN - IF NEW.state = 'available' THEN - -- Notify will coalesce duplicate notifications within a transaction, so - -- keep these payloads generalized: - payload = json_build_object('queue', NEW.queue); - PERFORM - pg_notify('river_insert', payload::text); - END IF; - RETURN NULL; -END; -$$ -LANGUAGE plpgsql; --- +goose StatementEnd - -CREATE TRIGGER river_notify - AFTER INSERT ON river_job - FOR EACH ROW - EXECUTE PROCEDURE river_job_notify(); - -CREATE UNLOGGED TABLE river_leader( - -- 8 bytes each (no alignment needed) - elected_at timestamptz NOT NULL, - expires_at timestamptz NOT NULL, - - -- types stored out-of-band - leader_id text NOT NULL, - name text PRIMARY KEY, - - CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128), - CONSTRAINT leader_id_length CHECK (char_length(leader_id) > 0 AND char_length(leader_id) < 128) -); - --- +goose Down --- +goose StatementBegin --- TODO(bgentry): we should have the equivalent down migrations made available in River. -DROP TABLE river_leader; -DROP TRIGGER river_notify ON river_job; -DROP FUNCTION river_job_notify(); -DROP TABLE river_job; -DROP TYPE river_job_state; --- +goose StatementEnd diff --git a/internal/dbsqlc/migrations/20231127194700_non_null_tags.sql b/internal/dbsqlc/migrations/20231127194700_non_null_tags.sql deleted file mode 100644 index adfc9f14..00000000 --- a/internal/dbsqlc/migrations/20231127194700_non_null_tags.sql +++ /dev/null @@ -1,10 +0,0 @@ --- +goose Up -ALTER TABLE river_job ALTER COLUMN tags SET DEFAULT '{}'; -UPDATE river_job SET tags = '{}' WHERE tags IS NULL; -ALTER TABLE river_job ALTER COLUMN tags SET NOT NULL; - --- +goose Down --- +goose StatementBegin -ALTER TABLE river_job ALTER COLUMN tags DROP NOT NULL, - ALTER COLUMN tags DROP DEFAULT; --- +goose StatementEnd diff --git a/internal/dbsqlc/migrations/20240418120000_river_v4.sql b/internal/dbsqlc/migrations/20240418120000_river_v4.sql deleted file mode 100644 index ad649147..00000000 --- a/internal/dbsqlc/migrations/20240418120000_river_v4.sql +++ /dev/null @@ -1,74 +0,0 @@ --- +goose Up --- The args column never had a NOT NULL constraint or default value at the --- database level, though we tried to ensure one at the application level. -ALTER TABLE river_job ALTER COLUMN args SET DEFAULT '{}'; -UPDATE river_job SET args = '{}' WHERE args IS NULL; -ALTER TABLE river_job ALTER COLUMN args SET NOT NULL; -ALTER TABLE river_job ALTER COLUMN args DROP DEFAULT; - --- The metadata column never had a NOT NULL constraint or default value at the --- database level, though we tried to ensure one at the application level. -ALTER TABLE river_job ALTER COLUMN metadata SET DEFAULT '{}'; -UPDATE river_job SET metadata = '{}' WHERE metadata IS NULL; -ALTER TABLE river_job ALTER COLUMN metadata SET NOT NULL; - --- The 'pending' job state will be used for upcoming functionality: -ALTER TYPE river_job_state ADD VALUE 'pending' AFTER 'discarded'; - -ALTER TABLE river_job DROP CONSTRAINT finalized_or_finalized_at_null; -ALTER TABLE river_job ADD CONSTRAINT finalized_or_finalized_at_null CHECK ( - (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR - (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) -); - -DROP TRIGGER river_notify ON river_job; -DROP FUNCTION river_job_notify; - -CREATE TABLE river_queue( - name text PRIMARY KEY NOT NULL, - created_at timestamptz NOT NULL DEFAULT NOW(), - metadata jsonb NOT NULL DEFAULT '{}' ::jsonb, - paused_at timestamptz, - updated_at timestamptz NOT NULL -); - --- +goose Down --- +goose StatementBegin -ALTER TABLE river_job ALTER COLUMN args DROP NOT NULL; - -ALTER TABLE river_job ALTER COLUMN metadata DROP NOT NULL; -ALTER TABLE river_job ALTER COLUMN metadata DROP DEFAULT; - --- It is not possible to safely remove 'pending' from the river_job_state enum, --- so leave it in place. - -ALTER TABLE river_job DROP CONSTRAINT finalized_or_finalized_at_null; -ALTER TABLE river_job ADD CONSTRAINT finalized_or_finalized_at_null CHECK ( - (state IN ('cancelled', 'completed', 'discarded') AND finalized_at IS NOT NULL) OR finalized_at IS NULL -); - -CREATE OR REPLACE FUNCTION river_job_notify() - RETURNS TRIGGER - AS $$ -DECLARE - payload json; -BEGIN - IF NEW.state = 'available' THEN - -- Notify will coalesce duplicate notifications within a transaction, so - -- keep these payloads generalized: - payload = json_build_object('queue', NEW.queue); - PERFORM - pg_notify('river_insert', payload::text); - END IF; - RETURN NULL; -END; -$$ -LANGUAGE plpgsql; - -CREATE TRIGGER river_notify - AFTER INSERT ON river_job - FOR EACH ROW - EXECUTE PROCEDURE river_job_notify(); - -DROP TABLE river_queue; --- +goose StatementEnd diff --git a/internal/dbsqlc/models.go b/internal/dbsqlc/models.go index 0c0dde59..dcd3a3cb 100644 --- a/internal/dbsqlc/models.go +++ b/internal/dbsqlc/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.27.0 +// sqlc v1.28.0 package dbsqlc @@ -59,23 +59,44 @@ func (ns NullRiverJobState) Value() (driver.Value, error) { return string(ns.RiverJobState), nil } +type RiverClient struct { + ID string + CreatedAt pgtype.Timestamptz + Metadata []byte + PausedAt pgtype.Timestamptz + UpdatedAt pgtype.Timestamptz +} + +type RiverClientQueue struct { + RiverClientID string + Name string + CreatedAt pgtype.Timestamptz + MaxWorkers int64 + Metadata []byte + NumJobsCompleted int64 + NumJobsRunning int64 + UpdatedAt pgtype.Timestamptz +} + type RiverJob struct { - ID int64 - State RiverJobState - Attempt int16 - MaxAttempts int16 - AttemptedAt pgtype.Timestamptz - CreatedAt pgtype.Timestamptz - FinalizedAt pgtype.Timestamptz - ScheduledAt pgtype.Timestamptz - Priority int16 - Args []byte - AttemptedBy []string - Errors [][]byte - Kind string - Metadata []byte - Queue string - Tags []string + ID int64 + Args []byte + Attempt int16 + AttemptedAt pgtype.Timestamptz + AttemptedBy []string + CreatedAt pgtype.Timestamptz + Errors [][]byte + FinalizedAt pgtype.Timestamptz + Kind string + MaxAttempts int16 + Metadata []byte + Priority int16 + Queue string + State RiverJobState + ScheduledAt pgtype.Timestamptz + Tags []string + UniqueKey []byte + UniqueStates pgtype.Bits } type RiverLeader struct { @@ -85,6 +106,23 @@ type RiverLeader struct { Name string } +type RiverMigration struct { + Line string + Version int64 + CreatedAt pgtype.Timestamptz +} + +type RiverProducer struct { + ID int64 + ClientID string + QueueName string + MaxWorkers int32 + Metadata []byte + PausedAt pgtype.Timestamptz + CreatedAt pgtype.Timestamptz + UpdatedAt pgtype.Timestamptz +} + type RiverQueue struct { Name string CreatedAt pgtype.Timestamptz diff --git a/internal/dbsqlc/pg_misc.sql b/internal/dbsqlc/pg_misc.sql new file mode 100644 index 00000000..3aac89d7 --- /dev/null +++ b/internal/dbsqlc/pg_misc.sql @@ -0,0 +1,32 @@ +-- name: IndexesExistInCurrentSchema :many +WITH index_names AS ( + SELECT unnest(@index_names::text[]) as index_name +) +SELECT index_names.index_name::text, + EXISTS ( + SELECT 1 + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = current_schema() + AND c.relname = index_names.index_name + AND c.relkind = 'i' + ) AS exists +FROM index_names; + +-- name: IndexExistsInCurrentSchema :one +SELECT EXISTS ( + SELECT 1 + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = current_schema() + AND c.relname = @index_name::text + AND c.relkind = 'i' +); + +-- name: TableExistsInCurrentSchema :one +SELECT EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_schema = current_schema() + AND table_name = @table_name::text +); diff --git a/internal/dbsqlc/pg_misc.sql.go b/internal/dbsqlc/pg_misc.sql.go new file mode 100644 index 00000000..51d1b92d --- /dev/null +++ b/internal/dbsqlc/pg_misc.sql.go @@ -0,0 +1,85 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.28.0 +// source: pg_misc.sql + +package dbsqlc + +import ( + "context" +) + +const indexExistsInCurrentSchema = `-- name: IndexExistsInCurrentSchema :one +SELECT EXISTS ( + SELECT 1 + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = current_schema() + AND c.relname = $1::text + AND c.relkind = 'i' +) +` + +func (q *Queries) IndexExistsInCurrentSchema(ctx context.Context, db DBTX, indexName string) (bool, error) { + row := db.QueryRow(ctx, indexExistsInCurrentSchema, indexName) + var exists bool + err := row.Scan(&exists) + return exists, err +} + +const indexesExistInCurrentSchema = `-- name: IndexesExistInCurrentSchema :many +WITH index_names AS ( + SELECT unnest($1::text[]) as index_name +) +SELECT index_names.index_name::text, + EXISTS ( + SELECT 1 + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = current_schema() + AND c.relname = index_names.index_name + AND c.relkind = 'i' + ) AS exists +FROM index_names +` + +type IndexesExistInCurrentSchemaRow struct { + IndexNamesIndexName string + Exists bool +} + +func (q *Queries) IndexesExistInCurrentSchema(ctx context.Context, db DBTX, indexNames []string) ([]*IndexesExistInCurrentSchemaRow, error) { + rows, err := db.Query(ctx, indexesExistInCurrentSchema, indexNames) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*IndexesExistInCurrentSchemaRow + for rows.Next() { + var i IndexesExistInCurrentSchemaRow + if err := rows.Scan(&i.IndexNamesIndexName, &i.Exists); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const tableExistsInCurrentSchema = `-- name: TableExistsInCurrentSchema :one +SELECT EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_schema = current_schema() + AND table_name = $1::text +) +` + +func (q *Queries) TableExistsInCurrentSchema(ctx context.Context, db DBTX, tableName string) (bool, error) { + row := db.QueryRow(ctx, tableExistsInCurrentSchema, tableName) + var exists bool + err := row.Scan(&exists) + return exists, err +} diff --git a/internal/dbsqlc/query.sql.go b/internal/dbsqlc/query.sql.go index ce47ef82..cb83cb28 100644 --- a/internal/dbsqlc/query.sql.go +++ b/internal/dbsqlc/query.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.27.0 +// sqlc v1.28.0 // source: query.sql package dbsqlc @@ -109,7 +109,7 @@ func (q *Queries) JobCountByState(ctx context.Context, db DBTX) ([]*JobCountBySt const jobListWorkflow = `-- name: JobListWorkflow :many SELECT - id, state, attempt, max_attempts, attempted_at, created_at, finalized_at, scheduled_at, priority, args, attempted_by, errors, kind, metadata, queue, tags + id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states FROM river_job WHERE @@ -137,21 +137,23 @@ func (q *Queries) JobListWorkflow(ctx context.Context, db DBTX, arg *JobListWork var i RiverJob if err := rows.Scan( &i.ID, - &i.State, + &i.Args, &i.Attempt, - &i.MaxAttempts, &i.AttemptedAt, - &i.CreatedAt, - &i.FinalizedAt, - &i.ScheduledAt, - &i.Priority, - &i.Args, &i.AttemptedBy, + &i.CreatedAt, &i.Errors, + &i.FinalizedAt, &i.Kind, + &i.MaxAttempts, &i.Metadata, + &i.Priority, &i.Queue, + &i.State, + &i.ScheduledAt, &i.Tags, + &i.UniqueKey, + &i.UniqueStates, ); err != nil { return nil, err } diff --git a/internal/dbsqlc/river_client.sql b/internal/dbsqlc/river_client.sql new file mode 100644 index 00000000..f06edbf2 --- /dev/null +++ b/internal/dbsqlc/river_client.sql @@ -0,0 +1,8 @@ +CREATE UNLOGGED TABLE river_client ( + id text PRIMARY KEY NOT NULL, + created_at timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP, + metadata jsonb NOT NULL DEFAULT '{}' ::jsonb, + paused_at timestamptz, + updated_at timestamptz NOT NULL, + CONSTRAINT name_length CHECK (char_length(id) > 0 AND char_length(id) < 128) +); diff --git a/internal/dbsqlc/river_client_queue.sql b/internal/dbsqlc/river_client_queue.sql new file mode 100644 index 00000000..784a9b0e --- /dev/null +++ b/internal/dbsqlc/river_client_queue.sql @@ -0,0 +1,14 @@ +CREATE UNLOGGED TABLE river_client_queue ( + river_client_id text NOT NULL REFERENCES river_client (id) ON DELETE CASCADE, + name text NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + max_workers bigint NOT NULL DEFAULT 0, + metadata jsonb NOT NULL DEFAULT '{}', + num_jobs_completed bigint NOT NULL DEFAULT 0, + num_jobs_running bigint NOT NULL DEFAULT 0, + updated_at timestamptz NOT NULL, + PRIMARY KEY (river_client_id, name), + CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128), + CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0), + CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0) +); diff --git a/internal/dbsqlc/river_job.sql b/internal/dbsqlc/river_job.sql new file mode 100644 index 00000000..5019900d --- /dev/null +++ b/internal/dbsqlc/river_job.sql @@ -0,0 +1,38 @@ +CREATE TYPE river_job_state AS ENUM( + 'available', + 'cancelled', + 'completed', + 'discarded', + 'pending', + 'retryable', + 'running', + 'scheduled' +); + +CREATE TABLE river_job( + id bigserial PRIMARY KEY, + args jsonb NOT NULL DEFAULT '{}', + attempt smallint NOT NULL DEFAULT 0, + attempted_at timestamptz, + attempted_by text[], + created_at timestamptz NOT NULL DEFAULT NOW(), + errors jsonb[], + finalized_at timestamptz, + kind text NOT NULL, + max_attempts smallint NOT NULL, + metadata jsonb NOT NULL DEFAULT '{}', + priority smallint NOT NULL DEFAULT 1, + queue text NOT NULL DEFAULT 'default', + state river_job_state NOT NULL DEFAULT 'available', + scheduled_at timestamptz NOT NULL DEFAULT NOW(), + tags varchar(255)[] NOT NULL DEFAULT '{}', + unique_key bytea, + unique_states bit(8), + CONSTRAINT finalized_or_finalized_at_null CHECK ( + (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR + (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) + ), + CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4), + CONSTRAINT queue_length CHECK (char_length(queue) > 0 AND char_length(queue) < 128), + CONSTRAINT kind_length CHECK (char_length(kind) > 0 AND char_length(kind) < 128) +); diff --git a/internal/dbsqlc/river_leader.sql b/internal/dbsqlc/river_leader.sql new file mode 100644 index 00000000..83a27c04 --- /dev/null +++ b/internal/dbsqlc/river_leader.sql @@ -0,0 +1,8 @@ +CREATE UNLOGGED TABLE river_leader( + elected_at timestamptz NOT NULL, + expires_at timestamptz NOT NULL, + leader_id text NOT NULL, + name text PRIMARY KEY DEFAULT 'default', + CONSTRAINT name_length CHECK (name = 'default'), + CONSTRAINT leader_id_length CHECK (char_length(leader_id) > 0 AND char_length(leader_id) < 128) +); diff --git a/internal/dbsqlc/river_migration.sql b/internal/dbsqlc/river_migration.sql new file mode 100644 index 00000000..4afe22d9 --- /dev/null +++ b/internal/dbsqlc/river_migration.sql @@ -0,0 +1,8 @@ +CREATE TABLE river_migration( + line TEXT NOT NULL, + version bigint NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW(), + CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128), + CONSTRAINT version_gte_1 CHECK (version >= 1), + PRIMARY KEY (line, version) +); diff --git a/internal/dbsqlc/river_producer.sql b/internal/dbsqlc/river_producer.sql new file mode 100644 index 00000000..82e2a184 --- /dev/null +++ b/internal/dbsqlc/river_producer.sql @@ -0,0 +1,25 @@ +CREATE UNLOGGED TABLE river_producer ( + id bigserial PRIMARY KEY, + client_id text NOT NULL, + queue_name text NOT NULL, + + max_workers int NOT NULL CHECK (max_workers >= 0), + metadata jsonb NOT NULL DEFAULT '{}', + paused_at timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +-- name: ProducerListByQueue :many +SELECT + sqlc.embed(river_producer), + COALESCE( + ( + SELECT SUM((value->>'count')::int) + FROM jsonb_each(metadata->'concurrency'->'running') + ), + 0 + )::int as running +FROM river_producer +WHERE queue_name = @queue_name +ORDER BY id ASC; diff --git a/internal/dbsqlc/river_producer.sql.go b/internal/dbsqlc/river_producer.sql.go new file mode 100644 index 00000000..af5f2183 --- /dev/null +++ b/internal/dbsqlc/river_producer.sql.go @@ -0,0 +1,60 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.28.0 +// source: river_producer.sql + +package dbsqlc + +import ( + "context" +) + +const producerListByQueue = `-- name: ProducerListByQueue :many +SELECT + river_producer.id, river_producer.client_id, river_producer.queue_name, river_producer.max_workers, river_producer.metadata, river_producer.paused_at, river_producer.created_at, river_producer.updated_at, + COALESCE( + ( + SELECT SUM((value->>'count')::int) + FROM jsonb_each(metadata->'concurrency'->'running') + ), + 0 + )::int as running +FROM river_producer +WHERE queue_name = $1 +ORDER BY id ASC +` + +type ProducerListByQueueRow struct { + RiverProducer RiverProducer + Running int32 +} + +func (q *Queries) ProducerListByQueue(ctx context.Context, db DBTX, queueName string) ([]*ProducerListByQueueRow, error) { + rows, err := db.Query(ctx, producerListByQueue, queueName) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*ProducerListByQueueRow + for rows.Next() { + var i ProducerListByQueueRow + if err := rows.Scan( + &i.RiverProducer.ID, + &i.RiverProducer.ClientID, + &i.RiverProducer.QueueName, + &i.RiverProducer.MaxWorkers, + &i.RiverProducer.Metadata, + &i.RiverProducer.PausedAt, + &i.RiverProducer.CreatedAt, + &i.RiverProducer.UpdatedAt, + &i.Running, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/internal/dbsqlc/river_queue.sql b/internal/dbsqlc/river_queue.sql new file mode 100644 index 00000000..dd49cd2c --- /dev/null +++ b/internal/dbsqlc/river_queue.sql @@ -0,0 +1,8 @@ +CREATE TABLE river_queue( + name text PRIMARY KEY NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW(), + metadata jsonb NOT NULL DEFAULT '{}' ::jsonb, + paused_at timestamptz, + updated_at timestamptz NOT NULL +); + diff --git a/internal/dbsqlc/sqlc.yaml b/internal/dbsqlc/sqlc.yaml index 65f0e9b4..1b18cf71 100644 --- a/internal/dbsqlc/sqlc.yaml +++ b/internal/dbsqlc/sqlc.yaml @@ -2,9 +2,18 @@ version: "2" sql: - engine: "postgresql" queries: - - "query.sql" - - "workflow.sql" - schema: "migrations" + - pg_misc.sql + - river_producer.sql + - query.sql + - workflow.sql + schema: + - river_client.sql + - river_client_queue.sql + - river_job.sql + - river_leader.sql + - river_migration.sql + - river_producer.sql + - river_queue.sql gen: go: package: "dbsqlc" diff --git a/internal/dbsqlc/workflow.sql.go b/internal/dbsqlc/workflow.sql.go index 3da09f5c..e836fa2d 100644 --- a/internal/dbsqlc/workflow.sql.go +++ b/internal/dbsqlc/workflow.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.27.0 +// sqlc v1.28.0 // source: workflow.sql package dbsqlc diff --git a/src/App.tsx b/src/App.tsx index e6e373ff..544e3a7e 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -1,9 +1,12 @@ import { Providers } from "@providers"; +import { type Features, featuresKey, getFeatures } from "@services/features"; import { QueryClient } from "@tanstack/react-query"; import { createRouter, RouterProvider } from "@tanstack/react-router"; -import { ReactElement } from "react"; import "./global-type-overrides"; + +import { ReactElement, useEffect, useState } from "react"; + // Import the generated route tree import { routeTree } from "./routeTree.gen"; @@ -13,7 +16,14 @@ const queryClient = new QueryClient(); const router = createRouter({ basepath: window && window.__riverUiBasePath ? window.__riverUiBasePath() : "/", - context: { queryClient }, + context: { + features: { + hasClientTable: false, + hasProducerTable: false, + hasWorkflows: false, + } as Features, + queryClient, + }, routeTree, trailingSlash: "preserve", }); @@ -26,6 +36,30 @@ declare module "@tanstack/react-router" { } export const App = (): ReactElement => { + const [isReady, setIsReady] = useState(false); + + useEffect(() => { + // Fetch features before rendering the app + queryClient + .fetchQuery({ + queryFn: getFeatures, + queryKey: featuresKey(), + }) + .then((features) => { + router.update({ + context: { + features, + queryClient, + }, + }); + setIsReady(true); + }); + }, []); + + if (!isReady) { + return
; + } + return ( diff --git a/src/components/QueueDetail.tsx b/src/components/QueueDetail.tsx index 8ef0231d..58a9f724 100644 --- a/src/components/QueueDetail.tsx +++ b/src/components/QueueDetail.tsx @@ -1,27 +1,46 @@ -import { Queue } from "@services/queues"; - -import TopNavTitleOnly from "./TopNavTitleOnly"; - -const Content = ({ loading, queue }: QueueDetailProps) => { - if (loading) { - return

Loading…

; - } - - if (!queue) { - return

Queue not found.

; - } - - return

Loaded queue {queue.name}

; -}; +import { Badge } from "@components/Badge"; +import { Button } from "@components/Button"; +import Logo from "@components/Logo"; +import RelativeTimeFormatter from "@components/RelativeTimeFormatter"; +import TagInput from "@components/TagInput"; +import TopNavTitleOnly from "@components/TopNavTitleOnly"; +import { useFeatures } from "@contexts/Features.hook"; +import { Switch } from "@headlessui/react"; +import { ArrowRightIcon } from "@heroicons/react/20/solid"; +import { PauseCircleIcon, PlayCircleIcon } from "@heroicons/react/24/outline"; +import { Features } from "@services/features"; +import { type Producer } from "@services/producers"; +import { + type ConcurrencyConfig, + type PartitionConfig, + type Queue, +} from "@services/queues"; +import clsx from "clsx"; +import { type ReactElement, useEffect, useMemo, useState } from "react"; type QueueDetailProps = { loading: boolean; name: string; + pauseQueue: (name: string) => void; + producers?: Producer[]; queue?: Queue; + resumeQueue: (name: string) => void; + updateQueueConcurrency: ( + name: string, + concurrency: ConcurrencyConfig | null, + ) => void; }; -const QueueDetail = (props: QueueDetailProps) => { - const { name } = props; +const QueueDetail = ({ + loading, + name, + pauseQueue, + producers, + queue, + resumeQueue, + updateQueueConcurrency, +}: QueueDetailProps) => { + const { features } = useFeatures(); return (
@@ -34,10 +53,706 @@ const QueueDetail = (props: QueueDetailProps) => { />
- + {loading ? ( +

Loading…

+ ) : !queue ? ( +

Queue not found.

+ ) : ( +
+ + + {features.hasProducerTable ? ( + <> + + + {producers && producers.length > 0 && ( + + )} + + ) : ( + + )} +
+ )}
); }; export default QueueDetail; + +type QueueStatusCardProps = { + features: Features; + pauseQueue: (name: string) => void; + producers?: Producer[]; + queue: Queue; + resumeQueue: (name: string) => void; +}; + +const PauseResumeButton = ({ + isPaused, + onClick, +}: { + isPaused: boolean; + onClick: () => void; +}): ReactElement => ( + +); + +const QueueStatusCard = ({ + features, + pauseQueue, + producers, + queue, + resumeQueue, +}: QueueStatusCardProps) => { + const totalRunning = useMemo( + () => + features.hasProducerTable + ? producers?.reduce((acc, producer) => acc + producer.running, 0) + : queue?.countRunning, + [features.hasProducerTable, producers, queue?.countRunning], + ); + + return ( +
+
+
+
+
+
+ {queue.pausedAt ? ( + Paused + ) : ( + Active + )} +
+ + Created{" "} + + +
+
+ +
+
+
+
+ Available +
+
+ {queue.countAvailable} +
+
+
+
+ Running +
+
+ {totalRunning} +
+
+
+
+ +
+ resumeQueue(queue.name) + : () => pauseQueue(queue.name) + } + /> +
+
+
+
+ ); +}; + +type ConcurrencySettingsProps = { + producers?: Producer[]; + queue: Queue; + updateQueueConcurrency: ( + name: string, + concurrency: ConcurrencyConfig | null, + ) => void; +}; + +const ConcurrencySettings = ({ + producers, + queue, + updateQueueConcurrency, +}: ConcurrencySettingsProps) => { + const [isEditMode, setIsEditMode] = useState(false); + const [initialForm, setInitialForm] = useState<{ + enabled: boolean; + globalLimit: number; + localLimit: number; + partitionArgs: string[]; + partitionByArgs: boolean; + partitionByKind: boolean; + }>({ + enabled: false, + globalLimit: 0, + localLimit: 0, + partitionArgs: [], + partitionByArgs: false, + partitionByKind: false, + }); + const [concurrencyForm, setConcurrencyForm] = useState<{ + enabled: boolean; + globalLimit: number; + localLimit: number; + partitionArgs: string[]; + partitionByArgs: boolean; + partitionByKind: boolean; + }>({ + enabled: false, + globalLimit: 0, + localLimit: 0, + partitionArgs: [], + partitionByArgs: false, + partitionByKind: false, + }); + + const producerConcurrencyStatus = useMemo(() => { + if (!producers || producers.length === 0) + return { config: null, consistent: false }; + + const firstProducer = producers[0]; + const allSame = producers.every((p) => { + if (!p.concurrency && !firstProducer.concurrency) return true; + if (!p.concurrency || !firstProducer.concurrency) return false; + + const basicSettingsMatch = + p.concurrency.global_limit === firstProducer.concurrency.global_limit && + p.concurrency.local_limit === firstProducer.concurrency.local_limit; + + if (!p.concurrency.partition && !firstProducer.concurrency.partition) { + return basicSettingsMatch; + } + + if (!p.concurrency.partition || !firstProducer.concurrency.partition) { + return false; + } + + const byKindMatch = + p.concurrency.partition.by_kind === + firstProducer.concurrency.partition.by_kind; + + const byArgsMatch = (() => { + const pArgs = p.concurrency.partition.by_args; + const firstArgs = firstProducer.concurrency.partition.by_args; + + if (!pArgs && !firstArgs) return true; + if (!pArgs || !firstArgs) return false; + + return ( + pArgs.length === firstArgs.length && + pArgs.every((arg, i) => arg === firstArgs[i]) + ); + })(); + + return basicSettingsMatch && byKindMatch && byArgsMatch; + }); + + return { + config: allSame ? firstProducer.concurrency : null, + consistent: allSame, + }; + }, [producers]); + + // Update form when queue or producer settings change, but only if not in edit mode + useEffect(() => { + if (!isEditMode) { + const config = + queue.concurrency || + (producerConcurrencyStatus.consistent + ? producerConcurrencyStatus.config + : null); + + const newFormValues = { + enabled: !!queue.concurrency, + globalLimit: config?.global_limit || 0, + localLimit: config?.local_limit || 0, + partitionArgs: config?.partition?.by_args + ? Array.isArray(config.partition.by_args) + ? config.partition.by_args + : [] + : [], + partitionByArgs: !!config?.partition?.by_args, + partitionByKind: !!config?.partition?.by_kind, + }; + + setConcurrencyForm(newFormValues); + setInitialForm(newFormValues); + } + }, [queue, producerConcurrencyStatus, isEditMode]); + + // Check if form is dirty (has changes) + const isDirty = useMemo(() => { + const partitionArgsChanged = () => { + if ( + concurrencyForm.partitionArgs.length !== + initialForm.partitionArgs.length + ) { + return true; + } + // Compare arrays + return concurrencyForm.partitionArgs.some( + (arg, index) => arg !== initialForm.partitionArgs[index], + ); + }; + + return ( + concurrencyForm.enabled !== initialForm.enabled || + concurrencyForm.globalLimit !== initialForm.globalLimit || + concurrencyForm.localLimit !== initialForm.localLimit || + concurrencyForm.partitionByArgs !== initialForm.partitionByArgs || + concurrencyForm.partitionByKind !== initialForm.partitionByKind || + (concurrencyForm.partitionByArgs && partitionArgsChanged()) + ); + }, [concurrencyForm, initialForm]); + + // Toggle edit mode when enabled is switched + const handleEnabledToggle = (enabled: boolean) => { + setIsEditMode(true); + setConcurrencyForm((prev) => ({ ...prev, enabled })); + }; + + // For any form field change + const handleFormChange = ( + key: K, + value: (typeof concurrencyForm)[K], + ) => { + // Enter edit mode if we're changing values when concurrency is already overridden + if (!isEditMode && concurrencyForm.enabled) { + setIsEditMode(true); + } + setConcurrencyForm((prev) => ({ ...prev, [key]: value })); + }; + + const saveConcurrency = () => { + if (!queue) return; + + let newConcurrency = null; + + if (concurrencyForm.enabled) { + const partition: PartitionConfig = { + by_args: concurrencyForm.partitionByArgs + ? concurrencyForm.partitionArgs.length > 0 + ? concurrencyForm.partitionArgs + : [] + : null, + by_kind: concurrencyForm.partitionByKind ? [] : null, + }; + + newConcurrency = { + global_limit: concurrencyForm.globalLimit, + local_limit: concurrencyForm.localLimit, + partition, + }; + } + + updateQueueConcurrency(queue.name, newConcurrency); + setIsEditMode(false); + setInitialForm(concurrencyForm); + }; + + const cancelEdit = () => { + setConcurrencyForm(initialForm); + setIsEditMode(false); + }; + + const isFormDisabled = !concurrencyForm.enabled; + + return ( +
+
+
+

+ Concurrency +

+ + {!producerConcurrencyStatus.consistent && ( +
+ Clients have different concurrency settings +
+ )} +
+

+ Control how many jobs can run simultaneously in this queue. +

+
+ +
+
+ + + + +
+ +
+ {/* Left column - Limits */} +
+

+ Limits +

+
+
+ + + handleFormChange( + "globalLimit", + e.target.value ? parseInt(e.target.value) : 0, + ) + } + placeholder="No limit" + type="number" + value={concurrencyForm.globalLimit} + /> +
+ +
+ + + handleFormChange( + "localLimit", + e.target.value ? parseInt(e.target.value) : 0, + ) + } + placeholder="No limit" + type="number" + value={concurrencyForm.localLimit} + /> +
+
+
+ + {/* Right column - Partitioning */} +
+

+ Partitioning +

+
+
+
+ + handleFormChange("partitionByKind", partitionByKind) + } + > + + + +
+
+ + handleFormChange("partitionByArgs", partitionByArgs) + } + > + + + +
+
+ +
+ + handleFormChange("partitionArgs", tags)} + placeholder="Type JSON key and press Enter to add" + tags={concurrencyForm.partitionArgs} + /> +

+ Leave empty to partition by all args (not recommended for high + cardinality). +

+
+
+
+
+ +
+ + +
+
+
+ ); +}; + +const ClientsTable = ({ producers }: { producers: Producer[] }) => { + return ( +
+

+ Clients +

+
+ + + + + + + + + + + {producers.map((producer) => ( + + + + + + + ))} + +
+ ID + + Created + + Running + + Status +
+ + {producer.clientId} + +
+
Running
+
+ {producer.running} running +
+
Created
+
+ {producer.createdAt && ( + + )} +
+
+
+ {producer.createdAt && ( + + )} + + {producer.running} + + {producer.pausedAt ? ( + Paused + ) : ( + Active + )} +
+
+
+ ); +}; + +const ConcurrencyLimitsDisabled = () => { + return ( +
+
+
+

+ Concurrency +

+
+

+ Control how many jobs can run simultaneously in this queue. +

+
+ +
+
+
+
+ + Pro +
+
+
+

+ Concurrency limits are included with River Pro. +

+

+ Control how many jobs can run simultaneously across your entire + application or within a single client with concurrency limits. +

+

+ Limits can be partitioned by job attributes to fine-tune + performance and resource usage per customer, region, or job type. +

+
+ +
+
+
+ ); +}; diff --git a/src/components/QueueList.tsx b/src/components/QueueList.tsx index 76059cb8..14913c8d 100644 --- a/src/components/QueueList.tsx +++ b/src/components/QueueList.tsx @@ -72,7 +72,12 @@ const QueueList = ({ - {queue.name} + + {queue.name} +
Available
diff --git a/src/components/TagInput.stories.ts b/src/components/TagInput.stories.ts new file mode 100644 index 00000000..a51de36d --- /dev/null +++ b/src/components/TagInput.stories.ts @@ -0,0 +1,105 @@ +import type { Meta, StoryObj } from "@storybook/react"; + +import { type BadgeColor } from "./Badge"; +import TagInput from "./TagInput"; + +const meta: Meta = { + argTypes: { + badgeColor: { + control: "select", + }, + onChange: { action: "changed" }, + showHelpText: { + control: "boolean", + description: "Show the helper text below the input", + }, + }, + component: TagInput, + tags: ["autodocs"], + title: "Components/TagInput", +}; + +export default meta; + +type Story = StoryObj; + +export const Empty: Story = { + args: { + disabled: false, + placeholder: "Type and press Enter to add", + showHelpText: false, + tags: [], + }, +}; + +export const EmptyWithHelp: Story = { + args: { + disabled: false, + placeholder: "Type and press Enter to add", + showHelpText: true, + tags: [], + }, +}; + +export const WithTags: Story = { + args: { + badgeColor: "indigo", + disabled: false, + showHelpText: false, + tags: ["customer_id", "region", "user_id"], + }, +}; + +export const WithTagsAndHelp: Story = { + args: { + badgeColor: "indigo", + disabled: false, + showHelpText: true, + tags: ["customer_id", "region", "user_id"], + }, +}; + +export const WithManyTags: Story = { + args: { + disabled: false, + tags: [ + "customer_id", + "region", + "user_id", + "order_id", + "product_id", + "session_id", + "long_key_name_with_many_characters", + ], + }, +}; + +export const BlueBadges: Story = { + args: { + badgeColor: "blue" as BadgeColor, + disabled: false, + tags: ["customer_id", "region", "user_id"], + }, +}; + +export const GreenBadges: Story = { + args: { + badgeColor: "green" as BadgeColor, + disabled: false, + tags: ["customer_id", "region", "user_id"], + }, +}; + +export const Disabled: Story = { + args: { + disabled: true, + tags: ["customer_id", "region"], + }, +}; + +export const DisabledEmpty: Story = { + args: { + disabled: true, + tags: [], + }, +}; diff --git a/src/components/TagInput.tsx b/src/components/TagInput.tsx new file mode 100644 index 00000000..a90189ed --- /dev/null +++ b/src/components/TagInput.tsx @@ -0,0 +1,117 @@ +import { XMarkIcon } from "@heroicons/react/20/solid"; +import { KeyboardEvent, useEffect, useState } from "react"; + +import { Badge, type BadgeColor } from "./Badge"; + +export type TagInputProps = { + badgeColor?: BadgeColor; + disabled?: boolean; + id?: string; + name?: string; + onChange: (tags: string[]) => void; + placeholder?: string; + showHelpText?: boolean; + tags: string[]; +}; + +/** + * A component for inputting multiple tags or keys with a chip-like UI + */ +const TagInput = ({ + badgeColor = "indigo", + disabled = false, + id, + name, + onChange, + placeholder = "Type and press Enter to add", + showHelpText = false, + tags = [], +}: TagInputProps) => { + const [inputValue, setInputValue] = useState(""); + const [internalTags, setInternalTags] = useState(tags); + + // Update internal tags when external tags change + useEffect(() => { + setInternalTags(tags); + }, [tags]); + + const addTag = (tag: string) => { + const trimmedTag = tag.trim(); + if (trimmedTag && !internalTags.includes(trimmedTag)) { + const newTags = [...internalTags, trimmedTag]; + setInternalTags(newTags); + onChange(newTags); + } + setInputValue(""); + }; + + const removeTag = (tagToRemove: string) => { + const newTags = internalTags.filter((tag) => tag !== tagToRemove); + setInternalTags(newTags); + onChange(newTags); + }; + + const handleKeyDown = (e: KeyboardEvent) => { + if (e.key === "Enter" && inputValue) { + e.preventDefault(); + addTag(inputValue); + } else if ( + e.key === "Backspace" && + !inputValue && + internalTags.length > 0 + ) { + // Remove the last tag when backspace is pressed and input is empty + const newTags = [...internalTags]; + newTags.pop(); + setInternalTags(newTags); + onChange(newTags); + } + }; + + return ( +
+
+ {internalTags.map((tag) => ( +
+ + {tag} + {!disabled && ( + + )} + +
+ ))} + setInputValue(e.target.value)} + onKeyDown={handleKeyDown} + placeholder={internalTags.length === 0 ? placeholder : ""} + type="text" + value={inputValue} + /> +
+ {showHelpText && ( +

+ Enter multiple keys by typing each one and pressing Enter +

+ )} +
+ ); +}; + +export default TagInput; diff --git a/src/contexts/Features.hook.tsx b/src/contexts/Features.hook.tsx new file mode 100644 index 00000000..d0f0b9d1 --- /dev/null +++ b/src/contexts/Features.hook.tsx @@ -0,0 +1,11 @@ +import { useContext } from "react"; + +import { FeaturesContext } from "./Features"; + +export function useFeatures() { + const context = useContext(FeaturesContext); + if (context === undefined) { + throw new Error("useFeatures must be used within a FeaturesProvider"); + } + return context; +} diff --git a/src/contexts/Features.provider.tsx b/src/contexts/Features.provider.tsx new file mode 100644 index 00000000..94274c17 --- /dev/null +++ b/src/contexts/Features.provider.tsx @@ -0,0 +1,24 @@ +import { featuresKey, getFeatures } from "@services/features"; +import { useQuery } from "@tanstack/react-query"; + +import { FeaturesContext } from "./Features"; + +export function FeaturesProvider({ children }: { children: React.ReactNode }) { + const { data: features, isLoading } = useQuery({ + queryFn: getFeatures, + queryKey: featuresKey(), + // Refetch every 30 minutes, these are unlikely to change much: + refetchInterval: 30 * 60 * 1000, + }); + + // Block rendering until features are loaded: + if (isLoading || !features) { + return
; + } + + return ( + + {children} + + ); +} diff --git a/src/contexts/Features.tsx b/src/contexts/Features.tsx new file mode 100644 index 00000000..4a333624 --- /dev/null +++ b/src/contexts/Features.tsx @@ -0,0 +1,10 @@ +import { type Features } from "@services/features"; +import { createContext } from "react"; + +export interface UseFeaturesProps { + features: Features; +} + +export const FeaturesContext = createContext( + undefined, +); diff --git a/src/providers.tsx b/src/providers.tsx index a29cabdb..db6d2312 100644 --- a/src/providers.tsx +++ b/src/providers.tsx @@ -1,5 +1,6 @@ "use client"; +import { FeaturesProvider } from "@contexts/Features.provider"; import { RefreshSettingProvider } from "@contexts/RefreshSettings.provider"; import { SidebarSettingProvider } from "@contexts/SidebarSetting.provider"; import { queryClient } from "@services/queryClient"; @@ -13,9 +14,11 @@ export function Providers({ children }: { children: React.ReactNode }) { - - {children} - + + + {children} + + diff --git a/src/routes/__root.tsx b/src/routes/__root.tsx index 8c02be46..8e12b44a 100644 --- a/src/routes/__root.tsx +++ b/src/routes/__root.tsx @@ -1,12 +1,24 @@ import { Root } from "@components/Root"; +import { type Features, featuresKey, getFeatures } from "@services/features"; import { QueryClient } from "@tanstack/react-query"; import { createRootRouteWithContext } from "@tanstack/react-router"; -export const Route = createRootRouteWithContext<{ queryClient: QueryClient }>()( - { - component: RootComponent, +export const Route = createRootRouteWithContext<{ + features: Features; + queryClient: QueryClient; +}>()({ + beforeLoad: async ({ context: { queryClient } }) => { + const features = await queryClient.ensureQueryData({ + queryKey: featuresKey(), + queryFn: getFeatures, + }); + + return { + features, + }; }, -); + component: RootComponent, +}); function RootComponent() { return ; diff --git a/src/routes/jobs/index.tsx b/src/routes/jobs/index.tsx index 894a7f47..a05130a9 100644 --- a/src/routes/jobs/index.tsx +++ b/src/routes/jobs/index.tsx @@ -56,13 +56,7 @@ export const Route = createFileRoute("/jobs/")({ loaderDeps: ({ search: { limit, state } }) => { return { limit: limit || minimumLimit, state }; }, - loader: async ({ context, deps: { limit, state } }) => { - if (!context) { - // workaround for this issue: - // https://github.com/TanStack/router/issues/1751 - return; - } - const { queryClient } = context; + loader: async ({ context: { queryClient }, deps: { limit, state } }) => { // TODO: how to pass abortController.signal into ensureQueryData or queryOptions? // signal: abortController.signal, await Promise.all([ diff --git a/src/routes/queues/$name.tsx b/src/routes/queues/$name.tsx index 10320ad9..2cb8732b 100644 --- a/src/routes/queues/$name.tsx +++ b/src/routes/queues/$name.tsx @@ -1,28 +1,52 @@ import QueueDetail from "@components/QueueDetail"; import { useRefreshSetting } from "@contexts/RefreshSettings.hook"; -import { getQueue, getQueueKey } from "@services/queues"; -import { useQuery } from "@tanstack/react-query"; +import { listProducers, listProducersKey } from "@services/producers"; +import { + type ConcurrencyConfig, + getQueue, + getQueueKey, + pauseQueue, + resumeQueue, + updateQueue, +} from "@services/queues"; +import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; import { createFileRoute, ErrorComponent } from "@tanstack/react-router"; -// import QueueDetail from "@components/QueueDetail"; import { NotFoundError } from "@utils/api"; export const Route = createFileRoute("/queues/$name")({ parseParams: ({ name }) => ({ name }), stringifyParams: ({ name }) => ({ name: `${name}` }), - beforeLoad: ({ abortController, params: { name } }) => { + beforeLoad: ({ + abortController, + params: { name }, + context: { features }, + }) => { return { - queryOptions: { + producersQueryOptions: { + queryKey: listProducersKey(name), + queryFn: listProducers, + signal: abortController.signal, + enabled: features.hasProducerTable, + }, + queueQueryOptions: { queryKey: getQueueKey(name), queryFn: getQueue, - refetchInterval: 2000, signal: abortController.signal, }, }; }, - loader: async ({ context: { queryClient, queryOptions } }) => { - await queryClient.ensureQueryData(queryOptions); + loader: async ({ + context: { queryClient, queueQueryOptions, producersQueryOptions }, + }) => { + await Promise.all([ + queryClient.ensureQueryData(queueQueryOptions), + // Don't wait for or issue the producers query if it's not enabled: + ...(producersQueryOptions.enabled + ? [queryClient.ensureQueryData(producersQueryOptions)] + : []), + ]); }, errorComponent: ({ error }) => { @@ -38,14 +62,79 @@ export const Route = createFileRoute("/queues/$name")({ function QueueComponent() { const { name } = Route.useParams(); - const { queryOptions } = Route.useRouteContext(); + const { queueQueryOptions, producersQueryOptions } = Route.useRouteContext(); const refreshSettings = useRefreshSetting(); - queryOptions.refetchInterval = refreshSettings.intervalMs; + const { features } = Route.useRouteContext(); + const queryClient = useQueryClient(); + + const queueQuery = useQuery({ + ...queueQueryOptions, + refetchInterval: refreshSettings.intervalMs, + }); + const producersQuery = useQuery({ + ...producersQueryOptions, + refetchInterval: refreshSettings.intervalMs, + }); + + const loading = + queueQuery.isLoading || + (features.hasProducerTable && producersQuery.isLoading); + + const invalidateQueue = () => { + return queryClient.invalidateQueries({ + queryKey: getQueueKey(name), + }); + }; + + // Mutations for queue actions + const pauseMutation = useMutation({ + mutationFn: async (queueName: string) => pauseQueue({ name: queueName }), + throwOnError: true, + onSuccess: invalidateQueue, + }); + + const resumeMutation = useMutation({ + mutationFn: async (queueName: string) => resumeQueue({ name: queueName }), + throwOnError: true, + onSuccess: invalidateQueue, + }); + + const updateQueueMutation = useMutation({ + mutationFn: async ({ + queueName, + concurrencyConfig, + }: { + concurrencyConfig?: ConcurrencyConfig | null; + queueName: string; + }) => + updateQueue({ + name: queueName, + concurrency: concurrencyConfig, + }), + throwOnError: true, + onSuccess: invalidateQueue, + }); - const queueQuery = useQuery(queryOptions); - const { data: queue } = queueQuery; + // Wrapper for updateQueueConcurrency to match component prop signature + const handleUpdateQueueConcurrency = ( + queueName: string, + concurrency?: ConcurrencyConfig | null, + ) => { + updateQueueMutation.mutate({ + queueName, + concurrencyConfig: concurrency, + }); + }; return ( - + ); } diff --git a/src/services/features.ts b/src/services/features.ts new file mode 100644 index 00000000..a412865a --- /dev/null +++ b/src/services/features.ts @@ -0,0 +1,32 @@ +import type { QueryFunction } from "@tanstack/react-query"; + +import { API } from "@utils/api"; + +import { SnakeToCamelCase } from "./types"; + +export type Features = { + [Key in keyof FeaturesFromAPI as SnakeToCamelCase]: FeaturesFromAPI[Key]; +}; + +type FeaturesFromAPI = { + has_client_table: boolean; + has_producer_table: boolean; + has_workflows: boolean; +}; + +export const featuresKey = () => ["features"] as const; +export type FeaturesKey = ReturnType; + +const apiFeaturesToFeatures = (features: FeaturesFromAPI): Features => ({ + hasClientTable: features.has_client_table, + hasProducerTable: features.has_producer_table, + hasWorkflows: features.has_workflows, +}); + +export const getFeatures: QueryFunction = async ({ + signal, +}) => { + return API.get({ path: "/features" }, { signal }).then( + apiFeaturesToFeatures, + ); +}; diff --git a/src/services/producers.ts b/src/services/producers.ts new file mode 100644 index 00000000..19b3f780 --- /dev/null +++ b/src/services/producers.ts @@ -0,0 +1,56 @@ +import type { QueryFunction } from "@tanstack/react-query"; + +import { API } from "@utils/api"; + +import { ListResponse } from "./listResponse"; +import { ConcurrencyConfig } from "./queues"; +import { SnakeToCamelCase, StringEndingWithUnderscoreAt } from "./types"; + +export type Producer = { + [Key in keyof ProducerFromAPI as SnakeToCamelCase]: Key extends + | StringEndingWithUnderscoreAt + | undefined + ? Date | undefined + : ProducerFromAPI[Key]; +}; + +type ProducerFromAPI = { + client_id: string; + concurrency: ConcurrencyConfig | null; + created_at: string; + id: number; + max_workers: number; + paused_at: null | string; + queue_name: string; + running: number; + updated_at: string; +}; + +export const listProducersKey = (queueName: string) => + ["listProducers", queueName] as const; +export type ListProducersKey = ReturnType; + +const apiProducerToProducer = (producer: ProducerFromAPI): Producer => ({ + clientId: producer.client_id, + concurrency: producer.concurrency, + createdAt: new Date(producer.created_at), + id: producer.id, + maxWorkers: producer.max_workers, + pausedAt: producer.paused_at ? new Date(producer.paused_at) : undefined, + queueName: producer.queue_name, + running: producer.running, + updatedAt: new Date(producer.updated_at), +}); + +export const listProducers: QueryFunction< + Producer[], + ListProducersKey +> = async ({ queryKey, signal }) => { + const [, queueName] = queryKey; + const query = new URLSearchParams({ queue_name: queueName }); + + return API.get>( + { path: "/producers", query }, + { signal }, + ).then((response) => response.data.map(apiProducerToProducer)); +}; diff --git a/src/services/queues.ts b/src/services/queues.ts index e133847c..aaaf1e98 100644 --- a/src/services/queues.ts +++ b/src/services/queues.ts @@ -6,6 +6,17 @@ import type { SnakeToCamelCase, StringEndingWithUnderscoreAt } from "./types"; import { ListResponse } from "./listResponse"; +export interface ConcurrencyConfig { + global_limit: number; + local_limit: number; + partition: PartitionConfig; +} + +export interface PartitionConfig { + by_args: null | string[]; + by_kind: null | string[]; +} + export type Queue = { [Key in keyof QueueFromAPI as SnakeToCamelCase]: Key extends | StringEndingWithUnderscoreAt @@ -18,20 +29,20 @@ export type Queue = { // string dates instead of Date objects and keys as snake_case instead of // camelCase. export type QueueFromAPI = { + concurrency: ConcurrencyConfig | null; count_available: number; count_running: number; created_at: string; - metadata: object; name: string; paused_at?: string; updated_at: string; }; export const apiQueueToQueue = (queue: QueueFromAPI): Queue => ({ + concurrency: queue.concurrency, countAvailable: queue.count_available, countRunning: queue.count_running, createdAt: new Date(queue.created_at), - metadata: queue.metadata, name: queue.name, pausedAt: queue.paused_at ? new Date(queue.paused_at) : undefined, updatedAt: new Date(queue.updated_at), @@ -70,7 +81,6 @@ export const listQueues: QueryFunction = async ({ { signal }, ).then( // Map from QueueFromAPI to Queue: - // TODO: there must be a cleaner way to do this given the type definitions? (response) => response.data.map(apiQueueToQueue), ); }; @@ -90,3 +100,19 @@ export const resumeQueue: MutationFunction = async ({ }) => { return API.put(`/queues/${name}/resume`); }; + +type UpdateQueuePayload = { + concurrency?: ConcurrencyConfig | null; + name: string; +}; + +export const updateQueue: MutationFunction = async ({ + concurrency, + name, +}) => { + return API.patch(`/queues/${name}`, JSON.stringify({ concurrency }), { + headers: { + "Content-Type": "application/json", + }, + }); +}; diff --git a/src/stories/QueueDetail.stories.tsx b/src/stories/QueueDetail.stories.tsx new file mode 100644 index 00000000..b2b4bc5a --- /dev/null +++ b/src/stories/QueueDetail.stories.tsx @@ -0,0 +1,160 @@ +import QueueDetail from "@components/QueueDetail"; +import { type Producer } from "@services/producers"; +import { type ConcurrencyConfig } from "@services/queues"; +import { Meta, StoryObj } from "@storybook/react"; +import { producerFactory } from "@test/factories/producer"; +import { queueFactory } from "@test/factories/queue"; + +// Mock functions +const mockPauseQueue = (name: string) => { + console.log(`Pausing queue: ${name}`); +}; + +const mockResumeQueue = (name: string) => { + console.log(`Resuming queue: ${name}`); +}; + +const mockUpdateQueueConcurrency = ( + name: string, + concurrency: ConcurrencyConfig | null, +) => { + console.log(`Updating concurrency for queue ${name}:`, concurrency); +}; + +// Create consistent producers for stories +const createProducers = ( + count: number, + queueName: string, + options?: { + inconsistentConcurrency?: boolean; + paused?: boolean; + withConcurrency?: boolean; + }, +): Producer[] => { + return Array.from({ length: count }).map((_, i) => { + let producer = producerFactory.params({ queueName }).build(); + + if (options?.paused && i % 2 === 0) { + producer = producerFactory.paused().params({ queueName }).build(); + } + + if (options?.withConcurrency) { + producer = producerFactory + .withConcurrency() + .params({ queueName }) + .build(); + + // For inconsistent concurrency, modify some producer settings + if (options?.inconsistentConcurrency && i === 1) { + producer.concurrency!.global_limit = 20; + } + } + + return producer; + }); +}; + +const meta: Meta = { + args: { + loading: false, + name: "test-queue", + pauseQueue: mockPauseQueue, + resumeQueue: mockResumeQueue, + updateQueueConcurrency: mockUpdateQueueConcurrency, + }, + component: QueueDetail, + parameters: { + layout: "fullscreen", + }, + title: "Pages/QueueDetail", +}; + +export default meta; +type Story = StoryObj; + +// Loading state +export const Loading: Story = { + args: { + loading: true, + }, +}; + +// Queue not found +export const QueueNotFound: Story = { + args: { + loading: false, + queue: undefined, + }, +}; + +// Active queue with no producers (features disabled) +export const ActiveQueueWithoutPro: Story = { + args: { + producers: [], + queue: queueFactory.active().build(), + }, + parameters: { + features: { + hasProducerTable: false, + }, + }, +}; + +// Active queue with no producers (features enabled) +export const ActiveQueueNoProducers: Story = { + args: { + producers: [], + queue: queueFactory.active().build(), + }, +}; + +// Paused queue with no producers +export const PausedQueueNoProducers: Story = { + args: { + producers: [], + queue: queueFactory.paused().build(), + }, +}; + +// Active queue with producers +export const ActiveQueueWithProducers: Story = { + args: { + producers: createProducers(5, "test-queue"), + queue: queueFactory.active().build(), + }, +}; + +// Paused queue with some paused producers +export const PausedQueueWithMixedProducers: Story = { + args: { + producers: createProducers(5, "test-queue", { paused: true }), + queue: queueFactory.paused().build(), + }, +}; + +// Queue with concurrency settings +export const QueueWithConcurrencySettings: Story = { + args: { + producers: createProducers(3, "test-queue", { withConcurrency: true }), + queue: queueFactory.withConcurrency().build(), + }, +}; + +// Queue with inconsistent producer concurrency settings +export const QueueWithInconsistentConcurrency: Story = { + args: { + producers: createProducers(3, "test-queue", { + inconsistentConcurrency: true, + withConcurrency: true, + }), + queue: queueFactory.withConcurrency().build(), + }, +}; + +// Queue with many producers +export const QueueWithManyProducers: Story = { + args: { + producers: createProducers(20, "test-queue", { paused: true }), + queue: queueFactory.active().build(), + }, +}; diff --git a/src/test/factories/producer.ts b/src/test/factories/producer.ts new file mode 100644 index 00000000..f3b0d033 --- /dev/null +++ b/src/test/factories/producer.ts @@ -0,0 +1,55 @@ +import { faker } from "@faker-js/faker"; +import { type Producer } from "@services/producers"; +import { type ConcurrencyConfig } from "@services/queues"; +import { sub } from "date-fns"; +import { Factory } from "fishery"; + +class ProducerFactory extends Factory { + active() { + return this.params({ + pausedAt: undefined, + }); + } + + paused() { + return this.params({ + pausedAt: sub(new Date(), { minutes: 10 }), + }); + } + + withConcurrency(configOverrides?: Partial) { + // Create a valid ConcurrencyConfig with safe defaults + const concurrency: ConcurrencyConfig = { + global_limit: configOverrides?.global_limit ?? 10, + local_limit: configOverrides?.local_limit ?? 5, + partition: { + by_args: configOverrides?.partition?.by_args ?? [ + "customer_id", + "region", + ], + by_kind: configOverrides?.partition?.by_kind ?? null, + }, + }; + + return this.params({ + concurrency, + }); + } +} + +export const producerFactory = ProducerFactory.define(({ sequence }) => { + const createdAt = faker.date.recent({ days: 1 }); + const updatedAt = faker.date.recent({ days: 0.5 }); + + return { + clientId: `client-${sequence}`, + concurrency: null, + createdAt, + id: sequence, + maxWorkers: faker.number.int({ max: 50, min: 5 }), + pausedAt: undefined, + queueName: `queue-${faker.number.int({ max: 5, min: 1 })}`, + running: faker.number.int({ max: 20, min: 0 }), + updatedAt, + }; +}); diff --git a/src/test/factories/queue.ts b/src/test/factories/queue.ts new file mode 100644 index 00000000..f4522e4c --- /dev/null +++ b/src/test/factories/queue.ts @@ -0,0 +1,72 @@ +import { faker } from "@faker-js/faker"; +import { type ConcurrencyConfig, type Queue } from "@services/queues"; +import { sub } from "date-fns"; +import { Factory } from "fishery"; + +class QueueFactory extends Factory { + active() { + return this.params({ + pausedAt: undefined, + }); + } + + paused() { + return this.params({ + pausedAt: sub(new Date(), { minutes: 30 }), + }); + } + + withConcurrency(configOverrides?: Partial) { + // Create a valid ConcurrencyConfig with safe defaults + const concurrency: ConcurrencyConfig = { + global_limit: configOverrides?.global_limit ?? 10, + local_limit: configOverrides?.local_limit ?? 5, + partition: { + by_args: configOverrides?.partition?.by_args ?? [ + "customer_id", + "region", + ], + by_kind: configOverrides?.partition?.by_kind ?? null, + }, + }; + + return this.params({ + concurrency, + }); + } + + withoutConcurrency() { + return this.params({ + concurrency: null, + }); + } +} + +export const queueFactory = QueueFactory.define(({ params, sequence }) => { + const createdAt = params.createdAt || faker.date.recent({ days: 0.001 }); + const updatedAt = params.updatedAt || faker.date.recent({ days: 0.0001 }); + + // Create a properly typed concurrency config + let concurrency: ConcurrencyConfig | null = null; + if (params.concurrency) { + concurrency = { + global_limit: params.concurrency.global_limit || 0, + local_limit: params.concurrency.local_limit || 0, + partition: { + by_args: params.concurrency.partition?.by_args || null, + by_kind: params.concurrency.partition?.by_kind || null, + }, + }; + } + + return { + concurrency, + countAvailable: + params.countAvailable || faker.number.int({ max: 500, min: 0 }), + countRunning: params.countRunning || faker.number.int({ max: 100, min: 0 }), + createdAt, + name: params.name || `queue-${sequence}`, + pausedAt: params.pausedAt, + updatedAt, + }; +}); diff --git a/src/utils/api.ts b/src/utils/api.ts index bd4332d3..fc1eca88 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -7,6 +7,12 @@ export const API = { get: ({ path, query }: GetRequestOpts, config: RequestInit = {}) => request(APIUrl(path, query), config), + patch: ( + path: string, + body?: TBody, + config: RequestInit = {}, + ) => request(APIUrl(path), { ...config, body, method: "PATCH" }), + // Using `extends` to set a type constraint: post: ( path: string,