From 889479a1e6c000795dab5b8b6204d49171fd66b1 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 25 Sep 2025 20:52:57 -0500 Subject: [PATCH 1/5] dynamic feature flag extensions injected by pro bundle Move all Pro-related flags into the extensions setup so there's no need to check for these Pro artifacts in OSS. Also add a flag for durable periodic jobs. --- handler.go | 29 +-- handler_api_endpoint.go | 59 +----- handler_api_endpoint_test.go | 18 +- internal/apibundle/api_bundle.go | 9 +- internal/riveruicmd/riveruicmd.go | 4 +- riverproui/endpoints.go | 98 +++++++++- riverproui/endpoints_test.go | 177 ++++++++++++++++++ .../prohandler/pro_handler_api_endpoints.go | 2 +- .../pro_handler_api_endpoints_test.go | 11 +- riverproui/pro_handler_test.go | 63 +++++++ src/components/JobList.stories.ts | 17 +- src/components/JobList.test.tsx | 37 +--- src/components/SettingsPage.test.tsx | 13 +- src/hooks/use-settings.test.tsx | 13 +- src/services/features.test.ts | 22 ++- src/services/features.ts | 16 +- src/test/utils/features.ts | 15 ++ uiendpoints/bundle.go | 2 +- 18 files changed, 448 insertions(+), 157 deletions(-) create mode 100644 riverproui/endpoints_test.go create mode 100644 src/test/utils/features.ts diff --git a/handler.go b/handler.go index ccf8ea84..5f83e8fe 100644 --- a/handler.go +++ b/handler.go @@ -28,10 +28,6 @@ import ( "riverqueue.com/riverui/uiendpoints" ) -type endpointsExtensions interface { - Extensions() map[string]bool -} - // EndpointsOpts are the options for creating a new Endpoints bundle. type EndpointsOpts[TTx any] struct { // Tx is an optional transaction to wrap all database operations. It's mainly @@ -42,6 +38,7 @@ type EndpointsOpts[TTx any] struct { type endpoints[TTx any] struct { bundleOpts *uiendpoints.BundleOpts client *river.Client[TTx] + extensions func(ctx context.Context) (map[string]bool, error) opts *EndpointsOpts[TTx] } @@ -66,6 +63,19 @@ func (e *endpoints[TTx]) Configure(bundleOpts *uiendpoints.BundleOpts) { e.bundleOpts = bundleOpts } +func (e *endpoints[TTx]) Extensions(ctx context.Context) (map[string]bool, error) { + if e.extensions == nil { + return map[string]bool{}, nil + } + return e.extensions(ctx) +} + +// SetExtensionsProvider sets the extensions provider function for this bundle. +// This is a private "friend interface" method for use by riverproui. +func (e *endpoints[TTx]) SetExtensionsProvider(provider func(ctx context.Context) (map[string]bool, error)) { + e.extensions = provider +} + func (e *endpoints[TTx]) Validate() error { if e.client == nil { return errors.New("client is required") @@ -73,7 +83,7 @@ func (e *endpoints[TTx]) Validate() error { return nil } -func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts, extensions map[string]bool) []apiendpoint.EndpointInterface { +func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts) []apiendpoint.EndpointInterface { driver := e.client.Driver() var executor riverdriver.Executor if e.opts.Tx == nil { @@ -86,7 +96,7 @@ func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger Client: e.client, DB: executor, Driver: driver, - Extensions: extensions, + Extensions: e.Extensions, JobListHideArgsByDefault: e.bundleOpts.JobListHideArgsByDefault, Logger: logger, } @@ -232,12 +242,7 @@ func NewHandler(opts *HandlerOpts) (*Handler, error) { Validator: apitype.NewValidator(), } - extensions := map[string]bool{} - if withExtensions, ok := opts.Endpoints.(endpointsExtensions); ok { - extensions = withExtensions.Extensions() - } - - endpoints := opts.Endpoints.MountEndpoints(baseservice.NewArchetype(opts.Logger), opts.Logger, mux, &mountOpts, extensions) + endpoints := opts.Endpoints.MountEndpoints(baseservice.NewArchetype(opts.Logger), opts.Logger, mux, &mountOpts) var services []startstop.Service diff --git a/handler_api_endpoint.go b/handler_api_endpoint.go index ef2c0544..58a109d5 100644 --- a/handler_api_endpoint.go +++ b/handler_api_endpoint.go @@ -176,62 +176,19 @@ type featuresGetRequest struct{} type featuresGetResponse struct { Extensions map[string]bool `json:"extensions"` - HasClientTable bool `json:"has_client_table"` - HasProducerTable bool `json:"has_producer_table"` - HasSequenceTable bool `json:"has_sequence_table"` - HasWorkflows bool `json:"has_workflows"` JobListHideArgsByDefault bool `json:"job_list_hide_args_by_default"` } func (a *featuresGetEndpoint[TTx]) Execute(ctx context.Context, _ *featuresGetRequest) (*featuresGetResponse, error) { - return dbutil.WithTxV(ctx, a.DB, func(ctx context.Context, execTx riverdriver.ExecutorTx) (*featuresGetResponse, error) { - tx := a.Driver.UnwrapTx(execTx) - - schema := a.Client.Schema() - hasClientTable, err := a.Driver.UnwrapExecutor(tx).TableExists(ctx, &riverdriver.TableExistsParams{ - Schema: schema, - Table: "river_client", - }) - if err != nil { - return nil, err - } - - hasProducerTable, err := a.Driver.UnwrapExecutor(tx).TableExists(ctx, &riverdriver.TableExistsParams{ - Schema: schema, - Table: "river_producer", - }) - if err != nil { - return nil, err - } - - hasSequenceTable, err := a.Driver.UnwrapExecutor(tx).TableExists(ctx, &riverdriver.TableExistsParams{ - Schema: schema, - Table: "river_job_sequence", - }) - if err != nil { - return nil, err - } - - indexResults, err := a.Driver.UnwrapExecutor(tx).IndexesExist(ctx, &riverdriver.IndexesExistParams{ - IndexNames: []string{ - "river_job_workflow_list_active", - "river_job_workflow_scheduling", - }, - Schema: schema, - }) - if err != nil { - return nil, err - } + extensions, err := a.Extensions(ctx) + if err != nil { + return nil, err + } - return &featuresGetResponse{ - Extensions: a.Extensions, - HasClientTable: hasClientTable, - HasProducerTable: hasProducerTable, - HasSequenceTable: hasSequenceTable, - HasWorkflows: indexResults["river_job_workflow_list_active"] || indexResults["river_job_workflow_scheduling"], - JobListHideArgsByDefault: a.JobListHideArgsByDefault, - }, nil - }) + return &featuresGetResponse{ + Extensions: extensions, + JobListHideArgsByDefault: a.JobListHideArgsByDefault, + }, nil } // diff --git a/handler_api_endpoint_test.go b/handler_api_endpoint_test.go index cd23af9d..86e029b0 100644 --- a/handler_api_endpoint_test.go +++ b/handler_api_endpoint_test.go @@ -48,7 +48,7 @@ func setupEndpoint[TEndpoint any](ctx context.Context, t *testing.T, initFunc fu Client: client, DB: exec, Driver: driver, - Extensions: map[string]bool{}, + Extensions: func(_ context.Context) (map[string]bool, error) { return map[string]bool{}, nil }, Logger: logger, }) @@ -247,10 +247,6 @@ func TestAPIHandlerFeaturesGet(t *testing.T) { require.NoError(t, err) require.Equal(t, &featuresGetResponse{ Extensions: map[string]bool{}, - HasClientTable: false, - HasProducerTable: false, - HasSequenceTable: false, - HasWorkflows: false, JobListHideArgsByDefault: false, }, resp) }) @@ -276,10 +272,6 @@ func TestAPIHandlerFeaturesGet(t *testing.T) { require.NoError(t, err) require.Equal(t, &featuresGetResponse{ Extensions: map[string]bool{}, - HasClientTable: true, - HasProducerTable: true, - HasSequenceTable: true, - HasWorkflows: true, JobListHideArgsByDefault: true, }, resp) }) @@ -288,9 +280,11 @@ func TestAPIHandlerFeaturesGet(t *testing.T) { t.Parallel() endpoint, _ := setupEndpoint(ctx, t, newFeaturesGetEndpoint) - endpoint.Extensions = map[string]bool{ - "test_1": true, - "test_2": false, + endpoint.Extensions = func(_ context.Context) (map[string]bool, error) { + return map[string]bool{ + "test_1": true, + "test_2": false, + }, nil } resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &featuresGetRequest{}) diff --git a/internal/apibundle/api_bundle.go b/internal/apibundle/api_bundle.go index e90127df..fda6db52 100644 --- a/internal/apibundle/api_bundle.go +++ b/internal/apibundle/api_bundle.go @@ -1,6 +1,7 @@ package apibundle import ( + "context" "log/slog" "github.com/riverqueue/river" @@ -14,7 +15,13 @@ type APIBundle[TTx any] struct { Client *river.Client[TTx] DB riverdriver.Executor Driver riverdriver.Driver[TTx] - Extensions map[string]bool + Extensions func(ctx context.Context) (map[string]bool, error) JobListHideArgsByDefault bool Logger *slog.Logger } + +// APIExtensionsProviderSetter is an interface to allow setting the extensions +// provider for the feature flag API. +type APIExtensionsProviderSetter interface { + SetExtensionsProvider(provider func(context.Context) (map[string]bool, error)) +} diff --git a/internal/riveruicmd/riveruicmd.go b/internal/riveruicmd/riveruicmd.go index 8facc522..5528d7e7 100644 --- a/internal/riveruicmd/riveruicmd.go +++ b/internal/riveruicmd/riveruicmd.go @@ -129,7 +129,7 @@ type initServerResult struct { uiHandler *riverui.Handler // River UI handler } -func initServer[TClient any](ctx context.Context, logger *slog.Logger, pathPrefix string, createClient func(*pgxpool.Pool) (TClient, error), createBundler func(TClient) uiendpoints.Bundle) (*initServerResult, error) { +func initServer[TClient any](ctx context.Context, logger *slog.Logger, pathPrefix string, createClient func(*pgxpool.Pool) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) (*initServerResult, error) { if !strings.HasPrefix(pathPrefix, "/") || pathPrefix == "" { return nil, fmt.Errorf("invalid path prefix: %s", pathPrefix) } @@ -170,7 +170,7 @@ func initServer[TClient any](ctx context.Context, logger *slog.Logger, pathPrefi uiHandler, err := riverui.NewHandler(&riverui.HandlerOpts{ DevMode: devMode, - Endpoints: createBundler(client), + Endpoints: createBundle(client), JobListHideArgsByDefault: jobListHideArgsByDefault, LiveFS: liveFS, Logger: logger, diff --git a/riverproui/endpoints.go b/riverproui/endpoints.go index a13d0fdb..132cabc2 100644 --- a/riverproui/endpoints.go +++ b/riverproui/endpoints.go @@ -1,11 +1,13 @@ package riverproui import ( + "context" "errors" "log/slog" "net/http" "github.com/riverqueue/apiframe/apiendpoint" + "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "riverqueue.com/riverpro" @@ -47,6 +49,12 @@ type endpoints[TTx any] struct { func (e *endpoints[TTx]) Configure(bundleOpts *uiendpoints.BundleOpts) { e.bundleOpts = bundleOpts + + // Inject extensions provider into OSS via private setter + if s, ok := e.ossEndpoints.(apibundle.APIExtensionsProviderSetter); ok { + s.SetExtensionsProvider(e.Extensions) + } + e.ossEndpoints.Configure(bundleOpts) } @@ -60,7 +68,85 @@ func (e *endpoints[TTx]) Validate() error { return nil } -func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts, extensions map[string]bool) []apiendpoint.EndpointInterface { +func (e *endpoints[TTx]) Extensions(ctx context.Context) (map[string]bool, error) { + ossDriver := e.client.Driver() + driver, ok := ossDriver.(prodriver.ProDriver[TTx]) + if !ok { + panic("riverpro.Client is not configured with a ProDriver") + } + + var executor prodriver.ProExecutor + if e.proOpts.Tx == nil { + executor = driver.GetProExecutor() + } else { + executor = driver.UnwrapProExecutor(*e.proOpts.Tx) + } + + schema := e.client.Schema() + + execTx, err := executor.Begin(ctx) + if err != nil { + return nil, err + } + defer execTx.Rollback(ctx) + + hasClientTable, err := execTx.TableExists(ctx, &riverdriver.TableExistsParams{ + Schema: schema, + Table: "river_client", + }) + if err != nil { + return nil, err + } + + hasPeriodicJobTable, err := execTx.TableExists(ctx, &riverdriver.TableExistsParams{ + Schema: schema, + Table: "river_periodic_job", + }) + if err != nil { + return nil, err + } + + hasProducerTable, err := execTx.TableExists(ctx, &riverdriver.TableExistsParams{ + Schema: schema, + Table: "river_producer", + }) + if err != nil { + return nil, err + } + + hasSequenceTable, err := execTx.TableExists(ctx, &riverdriver.TableExistsParams{ + Schema: schema, + Table: "river_job_sequence", + }) + if err != nil { + return nil, err + } + + indexResults, err := execTx.IndexesExist(ctx, &riverdriver.IndexesExistParams{ + IndexNames: []string{ + "river_job_workflow_list_active", + "river_job_workflow_scheduling", + }, + Schema: schema, + }) + if err != nil { + return nil, err + } + + hasWorkflows := indexResults["river_job_workflow_list_active"] || indexResults["river_job_workflow_scheduling"] + + return map[string]bool{ + "durable_periodic_jobs": hasPeriodicJobTable, + "producer_queries": true, + "workflow_queries": true, + "has_client_table": hasClientTable, + "has_producer_table": hasProducerTable, + "has_sequence_table": hasSequenceTable, + "has_workflows": hasWorkflows, + }, nil +} + +func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts) []apiendpoint.EndpointInterface { ossDriver := e.client.Driver() driver, ok := ossDriver.(prodriver.ProDriver[TTx]) if !ok { @@ -79,6 +165,7 @@ func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger Client: e.client.Client, DB: executor, Driver: driver, + Extensions: e.Extensions, JobListHideArgsByDefault: e.bundleOpts.JobListHideArgsByDefault, Logger: logger, }, @@ -86,7 +173,7 @@ func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger DB: executor, } - endpoints := e.ossEndpoints.MountEndpoints(archetype, logger, mux, mountOpts, extensions) + endpoints := e.ossEndpoints.MountEndpoints(archetype, logger, mux, mountOpts) endpoints = append(endpoints, apiendpoint.Mount(mux, prohandler.NewProducerListEndpoint(bundle), mountOpts), apiendpoint.Mount(mux, prohandler.NewWorkflowCancelEndpoint(bundle), mountOpts), @@ -97,10 +184,3 @@ func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger return endpoints } - -func (e *endpoints[TTx]) Extensions() map[string]bool { - return map[string]bool{ - "producer_queries": true, - "workflow_queries": true, - } -} diff --git a/riverproui/endpoints_test.go b/riverproui/endpoints_test.go new file mode 100644 index 00000000..887f64e8 --- /dev/null +++ b/riverproui/endpoints_test.go @@ -0,0 +1,177 @@ +package riverproui + +import ( + "context" + "log/slog" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + + "riverqueue.com/riverpro" + + "riverqueue.com/riverui/internal/riverinternaltest" +) + +func TestProEndpointsExtensions(t *testing.T) { //nolint:paralleltest + // Most of these tests involve schema changes and can't be parallelized without + // causing deadlocks. + ctx := context.Background() + + type testBundle struct { + client *riverpro.Client[pgx.Tx] + endpoint *endpoints[pgx.Tx] + logger *slog.Logger + tx pgx.Tx + } + + setup := func(ctx context.Context, t *testing.T) *testBundle { + t.Helper() + + logger := riverinternaltest.Logger(t) + client, _ := insertOnlyProClient(t, logger) + + tx := riverinternaltest.TestTx(ctx, t) + + endpoint := &endpoints[pgx.Tx]{ + client: client, + proOpts: &EndpointsOpts[pgx.Tx]{Tx: &tx}, + } + + return &testBundle{ + client: client, + endpoint: endpoint, + logger: logger, + tx: tx, + } + } + + t.Run("DurablePeriodicJobs", func(t *testing.T) { //nolint:paralleltest + t.Run("NoPeriodicJobTable", func(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `DROP TABLE IF EXISTS river_periodic_job;`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.False(t, ext["durable_periodic_jobs"]) + }) + + t.Run("WithPeriodicJobTable", func(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS river_periodic_job (id SERIAL PRIMARY KEY);`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.True(t, ext["durable_periodic_jobs"]) + }) + }) + + t.Run("ClientTableDetection", func(t *testing.T) { //nolint:paralleltest + t.Run("NoClientTable", func(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `DROP TABLE IF EXISTS river_client CASCADE;`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.False(t, ext["has_client_table"]) + }) + + t.Run("WithClientTable", func(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS river_client (id SERIAL PRIMARY KEY);`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.True(t, ext["has_client_table"]) + }) + }) + + t.Run("ProducerTableDetection", func(t *testing.T) { //nolint:paralleltest + t.Run("NoProducerTable", func(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `DROP TABLE IF EXISTS river_producer;`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.False(t, ext["has_producer_table"]) + }) + + t.Run("WithProducerTable", func(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS river_producer (id SERIAL PRIMARY KEY);`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.True(t, ext["has_producer_table"]) + }) + }) + + t.Run("SequenceTableDetection", func(t *testing.T) { //nolint:paralleltest + t.Run("NoSequenceTable", func(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `DROP TABLE IF EXISTS river_job_sequence;`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.False(t, ext["has_sequence_table"]) + }) + + t.Run("WithSequenceTable", func(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS river_job_sequence (id SERIAL PRIMARY KEY);`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.True(t, ext["has_sequence_table"]) + }) + }) + + t.Run("WorkflowsDetection", func(t *testing.T) { //nolint:paralleltest + t.Run("NoWorkflowIndexes", func(t *testing.T) { //nolint:paralleltest + // This can't be parallelized because it tries to make DB schema changes. + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `DROP INDEX IF EXISTS river_job_workflow_list_active;`) + require.NoError(t, err) + _, err = bundle.tx.Exec(ctx, `DROP INDEX IF EXISTS river_job_workflow_scheduling;`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.False(t, ext["has_workflows"]) + }) + }) + + t.Run("StaticAttributesAlwaysTrue", func(t *testing.T) { //nolint:paralleltest + bundle := setup(ctx, t) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.True(t, ext["producer_queries"]) + require.True(t, ext["workflow_queries"]) + }) +} diff --git a/riverproui/internal/prohandler/pro_handler_api_endpoints.go b/riverproui/internal/prohandler/pro_handler_api_endpoints.go index 52bf6f52..193b319f 100644 --- a/riverproui/internal/prohandler/pro_handler_api_endpoints.go +++ b/riverproui/internal/prohandler/pro_handler_api_endpoints.go @@ -74,7 +74,7 @@ func (req *producerListRequest) ExtractRaw(r *http.Request) error { func (a *producerListEndpoint[TTx]) Execute(ctx context.Context, req *producerListRequest) (*listResponse[uitype.RiverProducer], error) { result, err := a.DB.ProducerListByQueue(ctx, &riverprodriver.ProducerListByQueueParams{ QueueName: req.QueueName, - Schema: "", // TODO: need to inject schema from Client or params + Schema: a.Client.Schema(), }) if err != nil { return nil, fmt.Errorf("error listing producers: %w", err) diff --git a/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go b/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go index 00048c61..81453fe8 100644 --- a/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go +++ b/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go @@ -49,11 +49,12 @@ func setupEndpoint[TEndpoint any](ctx context.Context, t *testing.T, initFunc fu endpoint := initFunc(ProAPIBundle[pgx.Tx]{ APIBundle: apibundle.APIBundle[pgx.Tx]{ - Archetype: riversharedtest.BaseServiceArchetype(t), - Client: client.Client, - DB: exec, - Driver: driver, - Extensions: map[string]bool{}, + Archetype: riversharedtest.BaseServiceArchetype(t), + Client: client.Client, + DB: exec, + Driver: driver, + // Extensions aren't needed for any of these test endpoints + Extensions: func(_ context.Context) (map[string]bool, error) { return map[string]bool{}, nil }, Logger: logger, }, Client: client, diff --git a/riverproui/pro_handler_test.go b/riverproui/pro_handler_test.go index d1c74859..c1945c93 100644 --- a/riverproui/pro_handler_test.go +++ b/riverproui/pro_handler_test.go @@ -2,9 +2,11 @@ package riverproui import ( "context" + "encoding/json" "fmt" "log/slog" "net/http" + "net/http/httptest" "testing" "github.com/google/uuid" @@ -76,6 +78,9 @@ func TestProHandlerIntegration(t *testing.T) { workflowID2 := uuid.New() _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Metadata: uicommontest.MustMarshalJSON(t, map[string]uuid.UUID{"workflow_id": workflowID2})}) + // Verify OSS features endpoint is mounted and returns success even w/ Pro bundle: + makeAPICall(t, "FeaturesGet", http.MethodGet, "/api/features", nil) + makeAPICall(t, "ProducerList", http.MethodGet, "/api/pro/producers?queue_name="+queue.Name, nil) makeAPICall(t, "WorkflowCancel", http.MethodPost, fmt.Sprintf("/api/pro/workflows/%s/cancel", workflowID), nil) makeAPICall(t, "WorkflowGet", http.MethodGet, fmt.Sprintf("/api/pro/workflows/%s", workflowID2), nil) @@ -84,3 +89,61 @@ func TestProHandlerIntegration(t *testing.T) { handlertest.RunIntegrationTest(t, insertOnlyProClient, createBundle, createHandler, testRunner) } + +func TestProFeaturesEndpointResponse(t *testing.T) { + t.Parallel() + + ctx := context.Background() + logger := riverinternaltest.Logger(t) + + client, _ := insertOnlyProClient(t, logger) + tx := riverinternaltest.TestTx(ctx, t) + + bundle := NewEndpoints(client, &EndpointsOpts[pgx.Tx]{Tx: &tx}) + + // Reuse the same handler creation pattern as integration tests + handler := func() http.Handler { + logger := riverinternaltest.Logger(t) + opts := &riverui.HandlerOpts{ + DevMode: true, + Endpoints: bundle, + LiveFS: false, + Logger: logger, + } + h, err := riverui.NewHandler(opts) + require.NoError(t, err) + return h + }() + + recorder := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/features", nil) + + handler.ServeHTTP(recorder, req) + + status := recorder.Result().StatusCode + require.Equal(t, http.StatusOK, status) + + contentType := recorder.Header().Get("Content-Type") + // apiframe sets JSON content-type with charset; allow either exact or prefixed + require.Contains(t, contentType, "application/json") + + var resp struct { + Extensions map[string]bool `json:"extensions"` + JobListHideArgsByDefault bool `json:"job_list_hide_args_by_default"` + } + + require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &resp)) + require.NotNil(t, resp.Extensions) + + // Static flags always true; dynamic flags should also be true because pro migrations run for tests + expectedExtensions := map[string]bool{ + "durable_periodic_jobs": true, // dynamic + "has_client_table": true, // dynamic + "has_producer_table": true, // dynamic + "has_sequence_table": true, // dynamic + "has_workflows": true, // dynamic + "producer_queries": true, // static + "workflow_queries": true, // static + } + require.Equal(t, expectedExtensions, resp.Extensions) +} diff --git a/src/components/JobList.stories.ts b/src/components/JobList.stories.ts index a80e06c3..d8c1f124 100644 --- a/src/components/JobList.stories.ts +++ b/src/components/JobList.stories.ts @@ -4,6 +4,7 @@ import { useFeatures } from "@contexts/Features.hook"; import { useSettings } from "@hooks/use-settings"; import { JobState } from "@services/types"; import { jobMinimalFactory } from "@test/factories/job"; +import { createFeatures } from "@test/utils/features"; import { vi } from "vitest"; import JobList from "./JobList"; @@ -38,9 +39,9 @@ export const Running: Story = { { hook: useFeatures, mockValue: { - features: { + features: createFeatures({ jobListHideArgsByDefault: false, - }, + }), }, }, { @@ -64,9 +65,9 @@ export const ArgsHiddenByDefault: Story = { { hook: useFeatures, mockValue: { - features: { + features: createFeatures({ jobListHideArgsByDefault: true, - }, + }), }, }, { @@ -90,9 +91,9 @@ export const ArgsVisibleUserOverride: Story = { { hook: useFeatures, mockValue: { - features: { + features: createFeatures({ jobListHideArgsByDefault: true, - }, + }), }, }, { @@ -116,9 +117,9 @@ export const ArgsHiddenUserOverride: Story = { { hook: useFeatures, mockValue: { - features: { + features: createFeatures({ jobListHideArgsByDefault: false, - }, + }), }, }, { diff --git a/src/components/JobList.test.tsx b/src/components/JobList.test.tsx index eb01df6d..fda42b07 100644 --- a/src/components/JobList.test.tsx +++ b/src/components/JobList.test.tsx @@ -2,6 +2,7 @@ import { FeaturesContext } from "@contexts/Features"; import { useSettings } from "@hooks/use-settings"; import { JobState } from "@services/types"; import { jobMinimalFactory } from "@test/factories/job"; +import { createFeatures } from "@test/utils/features"; import { render, screen } from "@testing-library/react"; import { describe, expect, it, vi } from "vitest"; @@ -33,14 +34,9 @@ vi.mock("@hooks/use-settings", () => ({ describe("JobList", () => { it("shows job args by default", () => { const job = jobMinimalFactory.build(); - const features = { - hasClientTable: false, - hasProducerTable: false, - hasWorkflows: false, + const features = createFeatures({ jobListHideArgsByDefault: false, - producerQueries: false, - workflowQueries: false, - }; + }); // Mock settings with no override (useSettings as unknown as ReturnType).mockReturnValue({ @@ -70,14 +66,9 @@ describe("JobList", () => { it("hides job args when jobListHideArgsByDefault is true", () => { const job = jobMinimalFactory.build(); - const features = { - hasClientTable: false, - hasProducerTable: false, - hasWorkflows: false, + const features = createFeatures({ jobListHideArgsByDefault: true, - producerQueries: false, - workflowQueries: false, - }; + }); // Mock settings with no override (useSettings as unknown as ReturnType).mockReturnValue({ @@ -109,14 +100,9 @@ describe("JobList", () => { it("shows job args when user overrides default hide setting", () => { const job = jobMinimalFactory.build(); - const features = { - hasClientTable: false, - hasProducerTable: false, - hasWorkflows: false, + const features = createFeatures({ jobListHideArgsByDefault: true, // Server default is to hide - producerQueries: false, - workflowQueries: false, - }; + }); // Mock settings with override to show args (useSettings as unknown as ReturnType).mockReturnValue({ @@ -147,14 +133,9 @@ describe("JobList", () => { it("hides job args when user overrides default show setting", () => { const job = jobMinimalFactory.build(); - const features = { - hasClientTable: false, - hasProducerTable: false, - hasWorkflows: false, + const features = createFeatures({ jobListHideArgsByDefault: false, // Server default is to show - producerQueries: false, - workflowQueries: false, - }; + }); // Mock settings with override to hide args (useSettings as unknown as ReturnType).mockReturnValue({ diff --git a/src/components/SettingsPage.test.tsx b/src/components/SettingsPage.test.tsx index c8faa0a3..0b078bc1 100644 --- a/src/components/SettingsPage.test.tsx +++ b/src/components/SettingsPage.test.tsx @@ -1,5 +1,6 @@ import { useFeatures } from "@contexts/Features.hook"; import { useSettings } from "@hooks/use-settings"; +import { createFeatures } from "@test/utils/features"; import { fireEvent, render, screen } from "@testing-library/react"; import { describe, expect, it, vi } from "vitest"; @@ -29,9 +30,9 @@ describe("SettingsPage", () => { // Mock features (useFeatures as unknown as ReturnType).mockReturnValue({ - features: { + features: createFeatures({ jobListHideArgsByDefault: false, - }, + }), }); render(); @@ -69,9 +70,9 @@ describe("SettingsPage", () => { // Mock features (useFeatures as unknown as ReturnType).mockReturnValue({ - features: { + features: createFeatures({ jobListHideArgsByDefault: true, - }, + }), }); render(); @@ -104,9 +105,9 @@ describe("SettingsPage", () => { // Mock features (useFeatures as unknown as ReturnType).mockReturnValue({ - features: { + features: createFeatures({ jobListHideArgsByDefault: true, - }, + }), }); render(); diff --git a/src/hooks/use-settings.test.tsx b/src/hooks/use-settings.test.tsx index fd7bd5ea..92c86486 100644 --- a/src/hooks/use-settings.test.tsx +++ b/src/hooks/use-settings.test.tsx @@ -1,5 +1,6 @@ import { useFeatures } from "@contexts/Features.hook"; import { $userSettings } from "@stores/settings"; +import { createFeatures } from "@test/utils/features"; import { renderHook } from "@testing-library/react"; import { describe, expect, it, vi } from "vitest"; @@ -19,9 +20,9 @@ describe("useSettings", () => { it("should return default show job args value when no override", () => { // Mock Features hook (useFeatures as unknown as ReturnType).mockReturnValue({ - features: { + features: createFeatures({ jobListHideArgsByDefault: true, - }, + }), }); // Mock empty settings @@ -34,9 +35,9 @@ describe("useSettings", () => { it("should return override when set to true", () => { // Mock Features hook with hide args by default (useFeatures as unknown as ReturnType).mockReturnValue({ - features: { + features: createFeatures({ jobListHideArgsByDefault: true, - }, + }), }); // Mock settings with showJobArgs true @@ -49,9 +50,9 @@ describe("useSettings", () => { it("should return override when set to false", () => { // Mock Features hook with show args by default (useFeatures as unknown as ReturnType).mockReturnValue({ - features: { + features: createFeatures({ jobListHideArgsByDefault: false, - }, + }), }); // Mock settings with showJobArgs false diff --git a/src/services/features.test.ts b/src/services/features.test.ts index 7e99d023..bfd2e548 100644 --- a/src/services/features.test.ts +++ b/src/services/features.test.ts @@ -6,18 +6,21 @@ describe("apiFeaturesToFeatures", () => { it("converts API features to frontend features", () => { const apiFeatures = { extensions: { + durable_periodic_jobs: true, + has_client_table: true, + has_producer_table: true, + has_workflows: true, producer_queries: true, workflow_queries: true, }, - has_client_table: true, - has_producer_table: true, - has_workflows: true, job_list_hide_args_by_default: true, - }; + } as const; const expected = { + durablePeriodicJobs: true, hasClientTable: true, hasProducerTable: true, + hasSequenceTable: false, hasWorkflows: true, jobListHideArgsByDefault: true, producerQueries: true, @@ -30,18 +33,21 @@ describe("apiFeaturesToFeatures", () => { it("handles false values", () => { const apiFeatures = { extensions: { + durable_periodic_jobs: false, + has_client_table: false, + has_producer_table: false, + has_workflows: false, producer_queries: false, workflow_queries: false, }, - has_client_table: false, - has_producer_table: false, - has_workflows: false, job_list_hide_args_by_default: false, - }; + } as const; const expected = { + durablePeriodicJobs: false, hasClientTable: false, hasProducerTable: false, + hasSequenceTable: false, hasWorkflows: false, jobListHideArgsByDefault: false, producerQueries: false, diff --git a/src/services/features.ts b/src/services/features.ts index 29cb6fe2..f9a51f78 100644 --- a/src/services/features.ts +++ b/src/services/features.ts @@ -13,13 +13,18 @@ export type Features = { type FeaturesFromAPI = { extensions: Record; - has_client_table: boolean; - has_producer_table: boolean; - has_workflows: boolean; job_list_hide_args_by_default: boolean; }; -const KNOWN_EXTENSIONS = ["producer_queries", "workflow_queries"] as const; +const KNOWN_EXTENSIONS = [ + "durable_periodic_jobs", + "producer_queries", + "workflow_queries", + "has_client_table", + "has_producer_table", + "has_sequence_table", + "has_workflows", +] as const; type KnownExtensionKey = (typeof KNOWN_EXTENSIONS)[number]; type KnownExtensions = { @@ -63,9 +68,6 @@ export const apiFeaturesToFeatures = (features: FeaturesFromAPI): Features => { } return { - hasClientTable: features.has_client_table, - hasProducerTable: features.has_producer_table, - hasWorkflows: features.has_workflows, jobListHideArgsByDefault: features.job_list_hide_args_by_default, ...completeKnownExtensions, }; diff --git a/src/test/utils/features.ts b/src/test/utils/features.ts new file mode 100644 index 00000000..c40af055 --- /dev/null +++ b/src/test/utils/features.ts @@ -0,0 +1,15 @@ +import type { Features } from "@services/features"; + +export const createFeatures = ( + overrides: Partial = {}, +): Features => ({ + durablePeriodicJobs: false, + hasClientTable: false, + hasProducerTable: false, + hasSequenceTable: false, + hasWorkflows: false, + jobListHideArgsByDefault: false, + producerQueries: false, + workflowQueries: false, + ...overrides, +}); diff --git a/uiendpoints/bundle.go b/uiendpoints/bundle.go index 28baae09..056db0c5 100644 --- a/uiendpoints/bundle.go +++ b/uiendpoints/bundle.go @@ -24,6 +24,6 @@ type BundleOpts struct { // requires the `riverpro` module to be installed. type Bundle interface { Configure(bundleOpts *BundleOpts) - MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts, extensions map[string]bool) []apiendpoint.EndpointInterface + MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts) []apiendpoint.EndpointInterface Validate() error } From 42e2da769c26da79195a0ec9f4c428cccb78b409 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 25 Sep 2025 23:15:05 -0500 Subject: [PATCH 2/5] add periodic job endpoint --- riverproui/endpoints.go | 1 + .../prohandler/pro_handler_api_endpoints.go | 53 +++++++++++++++++++ .../pro_handler_api_endpoints_test.go | 38 ++++++++++++- .../protestfactory/pro_test_factory.go | 44 +++++++++++++++ riverproui/internal/uitype/ui_api_types.go | 7 +++ riverproui/pro_handler_test.go | 8 +++ 6 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 riverproui/internal/protestfactory/pro_test_factory.go diff --git a/riverproui/endpoints.go b/riverproui/endpoints.go index 132cabc2..8472396e 100644 --- a/riverproui/endpoints.go +++ b/riverproui/endpoints.go @@ -175,6 +175,7 @@ func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger endpoints := e.ossEndpoints.MountEndpoints(archetype, logger, mux, mountOpts) endpoints = append(endpoints, + apiendpoint.Mount(mux, prohandler.NewPeriodicJobListEndpoint(bundle), mountOpts), apiendpoint.Mount(mux, prohandler.NewProducerListEndpoint(bundle), mountOpts), apiendpoint.Mount(mux, prohandler.NewWorkflowCancelEndpoint(bundle), mountOpts), apiendpoint.Mount(mux, prohandler.NewWorkflowGetEndpoint(bundle), mountOpts), diff --git a/riverproui/internal/prohandler/pro_handler_api_endpoints.go b/riverproui/internal/prohandler/pro_handler_api_endpoints.go index 193b319f..de0bc428 100644 --- a/riverproui/internal/prohandler/pro_handler_api_endpoints.go +++ b/riverproui/internal/prohandler/pro_handler_api_endpoints.go @@ -39,6 +39,59 @@ func listResponseFrom[T any](data []*T) *listResponse[T] { return &listResponse[T]{Data: data} } +// +// periodicJobListEndpoint +// + +type periodicJobListEndpoint[TTx any] struct { + ProAPIBundle[TTx] + apiendpoint.Endpoint[periodicJobListRequest, listResponse[uitype.RiverPeriodicJob]] +} + +func NewPeriodicJobListEndpoint[TTx any](apiBundle ProAPIBundle[TTx]) *periodicJobListEndpoint[TTx] { + return &periodicJobListEndpoint[TTx]{ProAPIBundle: apiBundle} +} + +func (*periodicJobListEndpoint[TTx]) Meta() *apiendpoint.EndpointMeta { + return &apiendpoint.EndpointMeta{ + Pattern: "GET /api/pro/periodic-jobs", + StatusCode: http.StatusOK, + } +} + +type periodicJobListRequest struct { + Limit *int `json:"-" validate:"omitempty,min=0,max=1000"` // from ExtractRaw +} + +func (req *periodicJobListRequest) ExtractRaw(r *http.Request) error { + if limitStr := r.URL.Query().Get("limit"); limitStr != "" { + limit, err := strconv.Atoi(limitStr) + if err != nil { + return apierror.NewBadRequestf("Couldn't convert `limit` to integer: %s.", err) + } + + req.Limit = &limit + } + + return nil +} + +func (a *periodicJobListEndpoint[TTx]) Execute(ctx context.Context, req *periodicJobListRequest) (*listResponse[uitype.RiverPeriodicJob], error) { + result, err := a.DB.PeriodicJobGetAll(ctx, &riverprodriver.PeriodicJobGetAllParams{ + Max: ptrutil.ValOrDefault(req.Limit, 100), + Schema: a.Client.Schema(), + }) + if err != nil { + return nil, fmt.Errorf("error listing periodic jobs: %w", err) + } + + return listResponseFrom(sliceutil.Map(result, internalPeriodicJobToSerializablePeriodicJob)), nil +} + +func internalPeriodicJobToSerializablePeriodicJob(internal *riverprodriver.PeriodicJob) *uitype.RiverPeriodicJob { + return (*uitype.RiverPeriodicJob)(internal) +} + // // producerListEndpoint // diff --git a/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go b/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go index 81453fe8..dc29b81f 100644 --- a/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go +++ b/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go @@ -28,11 +28,12 @@ import ( "riverqueue.com/riverui/internal/riverinternaltest" "riverqueue.com/riverui/internal/riverinternaltest/testfactory" "riverqueue.com/riverui/internal/uicommontest" + "riverqueue.com/riverui/riverproui/internal/protestfactory" ) type setupEndpointTestBundle struct { client *riverpro.Client[pgx.Tx] - exec riverdriver.ExecutorTx + exec driver.ProExecutorTx logger *slog.Logger tx pgx.Tx } @@ -101,6 +102,41 @@ func testMountOpts(t *testing.T) *apiendpoint.MountOpts { } } +func TestProAPIHandlerPeriodicJobList(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("Success", func(t *testing.T) { + t.Parallel() + + endpoint, bundle := setupEndpoint(ctx, t, NewPeriodicJobListEndpoint) + + job1 := protestfactory.PeriodicJob(ctx, t, bundle.exec, &protestfactory.PeriodicJobOpts{ID: ptrutil.Ptr("alpha"), NextRunAt: ptrutil.Ptr(time.Now().Add(time.Minute))}) + job2 := protestfactory.PeriodicJob(ctx, t, bundle.exec, &protestfactory.PeriodicJobOpts{ID: ptrutil.Ptr("beta"), NextRunAt: ptrutil.Ptr(time.Now().Add(2 * time.Minute))}) + + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &periodicJobListRequest{}) + require.NoError(t, err) + require.Len(t, resp.Data, 2) + require.Equal(t, job1.ID, resp.Data[0].ID) + require.Equal(t, job2.ID, resp.Data[1].ID) + }) + + t.Run("Limit", func(t *testing.T) { + t.Parallel() + + endpoint, bundle := setupEndpoint(ctx, t, NewPeriodicJobListEndpoint) + + job1 := protestfactory.PeriodicJob(ctx, t, bundle.exec, nil) + _ = protestfactory.PeriodicJob(ctx, t, bundle.exec, nil) + + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &periodicJobListRequest{Limit: ptrutil.Ptr(1)}) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + require.Equal(t, job1.ID, resp.Data[0].ID) + }) +} + func TestProAPIHandlerWorkflowCancel(t *testing.T) { t.Parallel() diff --git a/riverproui/internal/protestfactory/pro_test_factory.go b/riverproui/internal/protestfactory/pro_test_factory.go new file mode 100644 index 00000000..ee145a79 --- /dev/null +++ b/riverproui/internal/protestfactory/pro_test_factory.go @@ -0,0 +1,44 @@ +package protestfactory + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/rivershared/util/ptrutil" + + "riverqueue.com/riverpro/driver" +) + +type PeriodicJobOpts struct { + ID *string + NextRunAt *time.Time + UpdatedAt *time.Time +} + +func PeriodicJob(ctx context.Context, tb testing.TB, exec driver.ProExecutor, opts *PeriodicJobOpts) *driver.PeriodicJob { + tb.Helper() + + if opts == nil { + opts = &PeriodicJobOpts{} + } + + periodicJob, err := exec.PeriodicJobInsert(ctx, &driver.PeriodicJobInsertParams{ + ID: ptrutil.ValOrDefaultFunc(opts.ID, func() string { return fmt.Sprintf("periodic_job_%05d", nextSeq()) }), + NextRunAt: ptrutil.ValOrDefaultFunc(opts.NextRunAt, time.Now), + UpdatedAt: opts.UpdatedAt, + Schema: "", + }) + require.NoError(tb, err) + return periodicJob +} + +var seq int64 = 1 //nolint:gochecknoglobals + +func nextSeq() int { + return int(atomic.AddInt64(&seq, 1)) +} diff --git a/riverproui/internal/uitype/ui_api_types.go b/riverproui/internal/uitype/ui_api_types.go index 0f844951..7490b871 100644 --- a/riverproui/internal/uitype/ui_api_types.go +++ b/riverproui/internal/uitype/ui_api_types.go @@ -13,6 +13,13 @@ type PartitionConfig struct { ByKind bool `json:"by_kind"` } +type RiverPeriodicJob struct { + ID string `json:"id"` + CreatedAt time.Time `json:"created_at"` + NextRunAt time.Time `json:"next_run_at"` + UpdatedAt time.Time `json:"updated_at"` +} + type RiverProducer struct { ID int64 `json:"id"` ClientID string `json:"client_id"` diff --git a/riverproui/pro_handler_test.go b/riverproui/pro_handler_test.go index c1945c93..a89afea9 100644 --- a/riverproui/pro_handler_test.go +++ b/riverproui/pro_handler_test.go @@ -17,6 +17,7 @@ import ( "github.com/riverqueue/river/riverdriver" "riverqueue.com/riverpro" + "riverqueue.com/riverpro/driver" "riverqueue.com/riverpro/driver/riverpropgxv5" "riverqueue.com/riverui" @@ -24,6 +25,7 @@ import ( "riverqueue.com/riverui/internal/riverinternaltest" "riverqueue.com/riverui/internal/riverinternaltest/testfactory" "riverqueue.com/riverui/internal/uicommontest" + "riverqueue.com/riverui/riverproui/internal/protestfactory" "riverqueue.com/riverui/uiendpoints" ) @@ -71,6 +73,11 @@ func TestProHandlerIntegration(t *testing.T) { testRunner := func(exec riverdriver.Executor, makeAPICall handlertest.APICallFunc) { ctx := context.Background() + proExec, ok := exec.(driver.ProExecutor) + require.True(t, ok) + + _ = protestfactory.PeriodicJob(ctx, t, proExec, nil) + queue := testfactory.Queue(ctx, t, exec, nil) workflowID := uuid.New() @@ -81,6 +88,7 @@ func TestProHandlerIntegration(t *testing.T) { // Verify OSS features endpoint is mounted and returns success even w/ Pro bundle: makeAPICall(t, "FeaturesGet", http.MethodGet, "/api/features", nil) + makeAPICall(t, "PeriodicJobList", http.MethodGet, "/api/pro/periodic-jobs", nil) makeAPICall(t, "ProducerList", http.MethodGet, "/api/pro/producers?queue_name="+queue.Name, nil) makeAPICall(t, "WorkflowCancel", http.MethodPost, fmt.Sprintf("/api/pro/workflows/%s/cancel", workflowID), nil) makeAPICall(t, "WorkflowGet", http.MethodGet, fmt.Sprintf("/api/pro/workflows/%s", workflowID2), nil) From 0c5b842bf77d41c6370fdefd53fe9e5c80637ae0 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Fri, 26 Sep 2025 09:34:54 -0500 Subject: [PATCH 3/5] fix concurrency settings w/ a single paused producer Also fix Queue stories that depend on feature flags. --- .storybook/preview.tsx | 1 + src/components/QueueDetail.stories.tsx | 139 ++++++++++++++++++++++--- src/components/QueueDetail.tsx | 7 +- 3 files changed, 127 insertions(+), 20 deletions(-) diff --git a/.storybook/preview.tsx b/.storybook/preview.tsx index 251c68b6..2fe3c706 100644 --- a/.storybook/preview.tsx +++ b/.storybook/preview.tsx @@ -26,6 +26,7 @@ export const withFeatures: Decorator = (StoryFn, context) => { // Default features with story-specific overrides const features = { hasProducerTable: true, + producerQueries: true, ...context.parameters?.features, }; diff --git a/src/components/QueueDetail.stories.tsx b/src/components/QueueDetail.stories.tsx index 0620d357..1adcff6b 100644 --- a/src/components/QueueDetail.stories.tsx +++ b/src/components/QueueDetail.stories.tsx @@ -1,8 +1,10 @@ +import { useFeatures } from "@contexts/Features.hook"; import { type Producer } from "@services/producers"; import { type ConcurrencyConfig } from "@services/queues"; import { Meta, StoryObj } from "@storybook/react-vite"; import { producerFactory } from "@test/factories/producer"; import { queueFactory } from "@test/factories/queue"; +import { createFeatures } from "@test/utils/features"; import QueueDetail from "./QueueDetail"; @@ -88,25 +90,25 @@ export const QueueNotFound: Story = { }, }; -// Active queue with no producers (features disabled) -export const ActiveQueueWithoutPro: Story = { - args: { - producers: [], - queue: queueFactory.active().build(), - }, - parameters: { - features: { - hasProducerTable: false, - }, - }, -}; - // Active queue with no producers (features enabled) export const ActiveQueueNoProducers: Story = { args: { producers: [], queue: queueFactory.active().build(), }, + parameters: { + mockData: [ + { + hook: useFeatures, + mockValue: { + features: createFeatures({ + hasProducerTable: true, + producerQueries: true, + }), + }, + }, + ], + }, }; // Paused queue with no producers @@ -115,6 +117,19 @@ export const PausedQueueNoProducers: Story = { producers: [], queue: queueFactory.paused().build(), }, + parameters: { + mockData: [ + { + hook: useFeatures, + mockValue: { + features: createFeatures({ + hasProducerTable: true, + producerQueries: true, + }), + }, + }, + ], + }, }; // Active queue with producers @@ -123,6 +138,19 @@ export const ActiveQueueWithProducers: Story = { producers: createProducers(5, "test-queue"), queue: queueFactory.active().build(), }, + parameters: { + mockData: [ + { + hook: useFeatures, + mockValue: { + features: createFeatures({ + hasProducerTable: true, + producerQueries: true, + }), + }, + }, + ], + }, }; // Paused queue with some paused producers @@ -131,18 +159,37 @@ export const PausedQueueWithMixedProducers: Story = { producers: createProducers(5, "test-queue", { paused: true }), queue: queueFactory.paused().build(), }, + parameters: { + mockData: [ + { + hook: useFeatures, + mockValue: { + features: createFeatures({ + hasProducerTable: true, + producerQueries: true, + }), + }, + }, + ], + }, }; // Queue with concurrency settings -export const QueueWithConcurrencySettings: Story = { +export const WithConcurrencySettings: Story = { args: { producers: createProducers(3, "test-queue", { withConcurrency: true }), queue: queueFactory.withConcurrency().build(), }, + parameters: { + features: createFeatures({ + hasProducerTable: true, + producerQueries: true, + }), + }, }; // Queue with inconsistent producer concurrency settings -export const QueueWithInconsistentConcurrency: Story = { +export const InconsistentConcurrency: Story = { args: { producers: createProducers(3, "test-queue", { inconsistentConcurrency: true, @@ -150,12 +197,72 @@ export const QueueWithInconsistentConcurrency: Story = { }), queue: queueFactory.withConcurrency().build(), }, + parameters: { + mockData: [ + { + hook: useFeatures, + mockValue: { + features: createFeatures({ + hasProducerTable: true, + producerQueries: true, + }), + }, + }, + ], + }, }; // Queue with many producers -export const QueueWithManyProducers: Story = { +export const ManyProducers: Story = { args: { producers: createProducers(20, "test-queue", { paused: true }), queue: queueFactory.active().build(), }, + parameters: { + mockData: [ + { + hook: useFeatures, + mockValue: { + features: createFeatures({ + hasProducerTable: true, + producerQueries: true, + }), + }, + }, + ], + }, +}; + +// Queue with single paused producer (should not show concurrency warning) +export const SinglePausedProducer: Story = { + args: { + producers: createProducers(1, "test-queue", { paused: true }), + queue: queueFactory.active().build(), + }, + parameters: { + mockData: [ + { + hook: useFeatures, + mockValue: { + features: createFeatures({ + hasProducerTable: true, + producerQueries: true, + }), + }, + }, + ], + }, +}; + +// Pro features disabled +export const WithoutPro: Story = { + args: { + producers: [], + queue: queueFactory.active().build(), + }, + parameters: { + features: createFeatures({ + hasProducerTable: false, + }), + }, }; diff --git a/src/components/QueueDetail.tsx b/src/components/QueueDetail.tsx index c38e7c56..9c218adb 100644 --- a/src/components/QueueDetail.tsx +++ b/src/components/QueueDetail.tsx @@ -252,14 +252,13 @@ const ConcurrencySettings = ({ const producerConcurrencyStatus = useMemo(() => { if (!producers || producers.length === 0) - return { config: null, consistent: false }; + return { config: null, consistent: true }; // Filter out paused producers const activeProducers = producers.filter((p) => !p.pausedAt); - // If there are no active producers, return null config - if (activeProducers.length === 0) - return { config: null, consistent: false }; + // If there are no active producers, return null config but consider it consistent + if (activeProducers.length === 0) return { config: null, consistent: true }; const firstProducer = activeProducers[0]; const allSame = activeProducers.every((p) => { From 6977cde6b1652148319f6f9041ffa9acd79f3686 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Sat, 27 Sep 2025 20:24:50 -0500 Subject: [PATCH 4/5] implement periodic job list view --- .../prohandler/pro_handler_api_endpoints.go | 5 +- src/components/Layout.tsx | 20 +-- src/components/PeriodicJobList.stories.tsx | 138 +++++++++++++++++ src/components/PeriodicJobList.tsx | 145 ++++++++++++++++++ src/components/PeriodicJobListEmptyState.tsx | 81 ++++++++++ src/components/PeriodicJobsPage.tsx | 145 ++++++++++++++++++ src/routeTree.gen.ts | 21 +++ src/routes/periodic-jobs/index.tsx | 47 ++++++ src/services/periodicJobs.ts | 45 ++++++ 9 files changed, 636 insertions(+), 11 deletions(-) create mode 100644 src/components/PeriodicJobList.stories.tsx create mode 100644 src/components/PeriodicJobList.tsx create mode 100644 src/components/PeriodicJobListEmptyState.tsx create mode 100644 src/components/PeriodicJobsPage.tsx create mode 100644 src/routes/periodic-jobs/index.tsx create mode 100644 src/services/periodicJobs.ts diff --git a/riverproui/internal/prohandler/pro_handler_api_endpoints.go b/riverproui/internal/prohandler/pro_handler_api_endpoints.go index de0bc428..36d8256f 100644 --- a/riverproui/internal/prohandler/pro_handler_api_endpoints.go +++ b/riverproui/internal/prohandler/pro_handler_api_endpoints.go @@ -78,8 +78,9 @@ func (req *periodicJobListRequest) ExtractRaw(r *http.Request) error { func (a *periodicJobListEndpoint[TTx]) Execute(ctx context.Context, req *periodicJobListRequest) (*listResponse[uitype.RiverPeriodicJob], error) { result, err := a.DB.PeriodicJobGetAll(ctx, &riverprodriver.PeriodicJobGetAllParams{ - Max: ptrutil.ValOrDefault(req.Limit, 100), - Schema: a.Client.Schema(), + Max: ptrutil.ValOrDefault(req.Limit, 100), + Schema: a.Client.Schema(), + StaleUpdatedAtHorizon: time.Now().Add(-24 * time.Hour), }) if err != nil { return nil, fmt.Errorf("error listing periodic jobs: %w", err) diff --git a/src/components/Layout.tsx b/src/components/Layout.tsx index 9bda8b01..d7b4c3bd 100644 --- a/src/components/Layout.tsx +++ b/src/components/Layout.tsx @@ -1,4 +1,5 @@ import Toast from "@components/Toast"; +import { useFeatures } from "@contexts/Features.hook"; import { useSidebarSetting } from "@contexts/SidebarSetting.hook"; import { Dialog, @@ -7,13 +8,13 @@ import { TransitionChild, } from "@headlessui/react"; import { + CalendarDaysIcon, Cog6ToothIcon, InboxStackIcon, QueueListIcon, RectangleGroupIcon, XMarkIcon, } from "@heroicons/react/24/outline"; -import useFeature from "@hooks/use-feature"; import { Link } from "@tanstack/react-router"; import { Fragment, PropsWithChildren, useMemo } from "react"; @@ -22,7 +23,7 @@ type LayoutProps = PropsWithChildren; const Layout = ({ children }: LayoutProps) => { const { open: sidebarOpen, setOpen: setSidebarOpen } = useSidebarSetting(); - const featureEnabledWorkflows = useFeature("ENABLE_WORKFLOWS", true); + const { features } = useFeatures(); const navigation = useMemo( () => @@ -34,14 +35,15 @@ const Layout = ({ children }: LayoutProps) => { search: {}, }, { href: "/queues", icon: InboxStackIcon, name: "Queues" }, + { href: "/workflows", icon: RectangleGroupIcon, name: "Workflows" }, { - hidden: !featureEnabledWorkflows, - href: "/workflows", - icon: RectangleGroupIcon, - name: "Workflows", + hidden: !features.durablePeriodicJobs, + href: "/periodic-jobs", + icon: CalendarDaysIcon, + name: "Periodic Jobs", }, ].filter((item) => item.hidden === undefined || item.hidden === false), - [featureEnabledWorkflows], + [features.durablePeriodicJobs], ); return ( @@ -163,8 +165,8 @@ const Layout = ({ children }: LayoutProps) => { {/* Static sidebar for desktop */} -
-
+
+