From faafe6a3377f1d29a27553e4a48071cf7d870f08 Mon Sep 17 00:00:00 2001 From: Brandur Date: Mon, 28 Apr 2025 00:29:49 -0700 Subject: [PATCH] Fix intermittent test `Test_Client_Maintenance/JobRescuer` Fix an intermittent test for the client maintenance services observed from time to time [1]. --- FAIL: Test_Client_Maintenance (0.00s) --- FAIL: Test_Client_Maintenance/JobRescuer (4.56s) client_test.go:3866: Generated schema "river_test_2025_04_26t22_05_48_schema_21" with migrations [1 2 3 4 5 6] on line "main" in 1.349501729s [21 generated] [14 reused] logger.go:256: time=2025-04-26T22:05:52.463Z level=INFO msg="River client started" client_id=fv-az2241-783_2025_04_26T22_05_52_429607 logger.go:256: time=2025-04-26T22:05:52.638Z level=ERROR msg="maintenance.JobRescuer: Attempted to rescue unhandled job kind, discarding" job_kind=fake_job job_id=6 logger.go:256: time=2025-04-26T22:05:53.228Z level=INFO msg="maintenance.JobRescuer: Ran successfully" num_jobs_discarded=1 num_jobs_retry_scheduled=2 logger.go:256: time=2025-04-26T22:05:53.253Z level=INFO msg="maintenance.JobScheduler: Ran successfully" num_jobs_scheduled=2 client_test.go:3912: Error Trace: /home/runner/work/river/river/client_test.go:3903 /home/runner/work/river/river/client_test.go:3912 Error: Not equal: expected: "retryable" actual : "completed" Diff: --- Expected +++ Actual @@ -1,2 +1,2 @@ -(rivertype.JobState) (len=9) "retryable" +(rivertype.JobState) (len=9) "completed" Test: Test_Client_Maintenance/JobRescuer logger.go:256: time=2025-04-26T22:05:55.631Z level=INFO msg="River client stopped" client_id=fv-az2241-783_2025_04_26T22_05_52_429607 riverdbtest.go:277: Checked in schema "river_test_2025_04_26t22_05_48_schema_21"; 1 idle schema(s) [22 generated] [15 reused] FAIL FAIL github.com/riverqueue/river 32.635s This one's similar to #852 in that it's possible for test case goroutine to be paused long enough (really only happens in GitHub Actions CI) that a job it moved to `retryable` actually becomes eligible to be worked, and may actually be worked in the interim before the test case picks up again to assert on it. The solution is to use the new `RetryPolicySlow` introduced in #852 to make sure that retryable jobs are scheduled far out in the future and never at risk of really being retried in the test case. We also make the slow retry interval's default a little longer. The test sets job's `attempted_at` (which its next `scheduled_at` will be based on) an hour in the past which is exactly what the slow retry interval was, making jobs still potentially immediately retryable. Make the slow interval two hours so contention is never possible. The good news is that this is the last intermittent test I've been tracking. I'm hoping that it along with the fixes from Saturday might be enough to take care of our intermittency problems for a while. [1] https://github.com/riverqueue/river/actions/runs/14685519857/job/41213612537 --- client_test.go | 10 +++++++--- .../retrypolicytest/retrypolicytest.go | 5 ++++- .../retrypolicytest/retrypolicytest_test.go | 17 +++++++++++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) create mode 100644 internal/riverinternaltest/retrypolicytest/retrypolicytest_test.go diff --git a/client_test.go b/client_test.go index c77e0d93..e9c9a54e 100644 --- a/client_test.go +++ b/client_test.go @@ -4001,6 +4001,7 @@ func Test_Client_Maintenance(t *testing.T) { t.Parallel() config := newTestConfig(t, "") + config.RetryPolicy = &retrypolicytest.RetryPolicySlow{} // make sure jobs aren't worked before we can assert on job config.RescueStuckJobsAfter = 5 * time.Minute client, bundle := setup(t, config) @@ -4036,11 +4037,12 @@ func Test_Client_Maintenance(t *testing.T) { svc.TestSignals.FetchedBatch.WaitOrTimeout() svc.TestSignals.UpdatedBatch.WaitOrTimeout() - requireJobHasState := func(jobID int64, state rivertype.JobState) { + requireJobHasState := func(jobID int64, state rivertype.JobState) *rivertype.JobRow { t.Helper() job, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: jobID, Schema: bundle.schema}) require.NoError(t, err) require.Equal(t, state, job.State) + return job } // unchanged @@ -4049,8 +4051,10 @@ func Test_Client_Maintenance(t *testing.T) { requireJobHasState(ineligibleJob3.ID, ineligibleJob3.State) // Jobs to retry should be retryable: - requireJobHasState(jobStuckToRetry1.ID, rivertype.JobStateRetryable) - requireJobHasState(jobStuckToRetry2.ID, rivertype.JobStateRetryable) + updatedJobStuckToRetry1 := requireJobHasState(jobStuckToRetry1.ID, rivertype.JobStateRetryable) + require.Greater(t, updatedJobStuckToRetry1.ScheduledAt, now.Add(10*time.Minute)) // make sure `scheduled_at` is a good margin in the future so it's not at risk of immediate retry (which could cause intermittent test issues) + updatedJobStuckToRetry2 := requireJobHasState(jobStuckToRetry2.ID, rivertype.JobStateRetryable) + require.Greater(t, updatedJobStuckToRetry2.ScheduledAt, now.Add(10*time.Minute)) // This one should be discarded because it's already at MaxAttempts: requireJobHasState(jobStuckToDiscard.ID, rivertype.JobStateDiscarded) diff --git a/internal/riverinternaltest/retrypolicytest/retrypolicytest.go b/internal/riverinternaltest/retrypolicytest/retrypolicytest.go index 7b761223..05598543 100644 --- a/internal/riverinternaltest/retrypolicytest/retrypolicytest.go +++ b/internal/riverinternaltest/retrypolicytest/retrypolicytest.go @@ -59,7 +59,10 @@ func (p *RetryPolicyNoJitter) retrySecondsWithoutJitter(attempt int) float64 { return min(retrySeconds, maxDurationSeconds) } -const retryPolicySlowInterval = 1 * time.Hour +// Choose a number here that's larger than the job rescuer's rescue interval of +// one hour so that can easily use it to test the job rescuer (in addition to +// other things). +const retryPolicySlowInterval = 2 * time.Hour // RetryPolicySlow is a retry policy that has a very slow retry interval. This // is used in tests that check retries to make sure that in slower environments diff --git a/internal/riverinternaltest/retrypolicytest/retrypolicytest_test.go b/internal/riverinternaltest/retrypolicytest/retrypolicytest_test.go new file mode 100644 index 00000000..768513bc --- /dev/null +++ b/internal/riverinternaltest/retrypolicytest/retrypolicytest_test.go @@ -0,0 +1,17 @@ +package retrypolicytest + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRetryPolicySlowIntervalSufficientlyLarge(t *testing.T) { + t.Parallel() + + // Should be a good margin larger than the job rescuer's default interval so + // that it can easily be used to test the job rescuer with making jobs + // accidentally eligible to work again right away. + require.Greater(t, (&RetryPolicySlow{}).Interval(), 1*time.Hour) +}