-
Notifications
You must be signed in to change notification settings - Fork 138
Description
Since upgrading from 0.6.0 to 0.7.0 I'm seeing the following error when shutting down the river client:
panic: send on closed channel
goroutine 63 [running]:
github.com/riverqueue/river/internal/jobcompleter.(*BatchCompleter).handleBatch(0x140001423c0, {0x101078580, 0x140006973c0})
/Users/foo/go/pkg/mod/github.com/riverqueue/river@v0.7.0/internal/jobcompleter/job_completer.go:420 +0x2ec
github.com/riverqueue/river/internal/jobcompleter.(*BatchCompleter).Start.func1()
/Users/foo/go/pkg/mod/github.com/riverqueue/river@v0.7.0/internal/jobcompleter/job_completer.go:314 +0x37c
created by github.com/riverqueue/river/internal/jobcompleter.(*BatchCompleter).Start in goroutine 1
/Users/foo/go/pkg/mod/github.com/riverqueue/river@v0.7.0/internal/jobcompleter/job_completer.go:270 +0x120
make: *** [run-worker] Error 2
or
panic: send on closed channel
goroutine 324 [running]:
github.com/riverqueue/river/internal/jobcompleter.(*AsyncCompleter).JobSetStateIfRunning.func1()
/Users/foo/go/pkg/mod/github.com/riverqueue/river@v0.7.0/internal/jobcompleter/job_completer.go:170 +0x110
golang.org/x/sync/errgroup.(*Group).Go.func1()
/Users/foo/go/pkg/mod/golang.org/x/sync@v0.7.0/errgroup/errgroup.go:78 +0x58
created by golang.org/x/sync/errgroup.(*Group).Go in goroutine 273
/Users/foo/go/pkg/mod/golang.org/x/sync@v0.7.0/errgroup/errgroup.go:75 +0x98
make: *** [run-worker] Error 2
I'm using the following context.Context when starting the client:
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer stop()
...
if err := riverClient.Start(ctx); err != nil {
return fmt.Errorf("failed to start river client: %w", err)
}and the following code to shut the river client down (maybe I'm holding it wrong?):
var wg errgroup.Group
wg.Go(func() error {
<-ctx.Done()
logger.Info("shutting down river client...")
// Stop fetching new work and wait for active jobs to finish.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := riverClient.Stop(ctx); !errors.Is(err, context.Canceled) {
return err
}
return nil
})So when the original context is canceled, the client gets 30 seconds to stop the jobs. This might be incorrect but worked with 0.6.0 and now I'm seeing the panic.
I've run git bisect and it seems that commit 0e57338 might have introduced this. I'm not familiar enough with the code to offer a suggestion. Happy to help though if you need anything.
Note: I haven't been able to reproduce this with a simple test case so it's probably something to do with my specific usage:
- the workers use the same pool that the river client uses
- there are multiple
Queues configured - the worker use TCP to send emails via SMTP, the test mail server is subject to adverse network conditions using toxiproxy. So there is some blocking/waiting happening.
Happy to explore this further. I'd appreciate if you could give me some pointers so that I know what to look for.