diff --git a/client_test.go b/client_test.go index e1fbd395..067c1cc9 100644 --- a/client_test.go +++ b/client_test.go @@ -7,6 +7,7 @@ import ( "fmt" "log/slog" "os" + "strconv" "strings" "sync" "testing" @@ -40,6 +41,7 @@ func waitForClientHealthy(ctx context.Context, t *testing.T, statusUpdateCh <-ch for { select { case status := <-statusUpdateCh: + t.Logf("Client status: elector=%d notifier=%d producers=%+v", status.Elector, status.Notifier, status.Producers) if status.Healthy() { return } @@ -80,6 +82,9 @@ type callbackFunc func(context.Context, *Job[callbackArgs]) error func makeAwaitCallback(startedCh chan<- int64, doneCh chan struct{}) callbackFunc { return func(ctx context.Context, job *Job[callbackArgs]) error { + client := ClientFromContext[pgx.Tx](ctx) + client.config.Logger.InfoContext(ctx, "callback job started with id="+strconv.FormatInt(job.ID, 10)) + select { case <-ctx.Done(): return ctx.Err() @@ -2412,11 +2417,12 @@ func Test_Client_InsertTriggersImmediateWork(t *testing.T) { client := newTestClient(t, dbPool, config) statusUpdateCh := client.monitor.RegisterUpdates() + startClient(ctx, t, client) + waitForClientHealthy(ctx, t, statusUpdateCh) + insertedJob, err := client.Insert(ctx, callbackArgs{}, nil) require.NoError(err) - startClient(ctx, t, client) - // Wait for the client to be ready by waiting for a job to be executed: select { case jobID := <-startedCh: @@ -2424,7 +2430,6 @@ func Test_Client_InsertTriggersImmediateWork(t *testing.T) { case <-ctx.Done(): t.Fatal("timed out waiting for warmup job to start") } - waitForClientHealthy(ctx, t, statusUpdateCh) // Now that we've run one job, we shouldn't take longer than the cooldown to // fetch another after insertion. LISTEN/NOTIFY should ensure we find out diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go index 7b8e0040..1c50ae81 100644 --- a/internal/riverinternaltest/riverinternaltest.go +++ b/internal/riverinternaltest/riverinternaltest.go @@ -69,7 +69,9 @@ func DatabaseConfig(databaseName string) *pgxpool.Config { panic(fmt.Sprintf("error parsing database URL: %v", err)) } config.MaxConns = dbPoolMaxConns - config.ConnConfig.ConnectTimeout = 10 * time.Second + // Use a short conn timeout here to attempt to quickly cancel attempts that + // are unlikely to succeed even with more time: + config.ConnConfig.ConnectTimeout = 2 * time.Second config.ConnConfig.RuntimeParams["timezone"] = "UTC" return config }