extract Client subscriptions into service#379
Conversation
e5bfb62 to
2c18e0c
Compare
|
The test panic is almost certainly specific to the structuring of some specific tests and not reflective of anything we’ll hit in real usage. I’ll see if I can pin it down tomorrow. |
980bf19 to
57d4c03
Compare
brandur
left a comment
There was a problem hiding this comment.
Nice, definitely feels nicer breaking some logic out of the client which is already way too big.
Can you add subscription_manager_test.go and put in some basic test cases? It'll be nice because it lets us test the struct in relative isolation. Probably just need a basic check or two and the standard StartStopStress.
subscription_manager.go
Outdated
| } | ||
|
|
||
| func (sm *subscriptionManager) Start(ctx context.Context) error { | ||
| sm.wg.Add(1) |
There was a problem hiding this comment.
Can we make this use the normal stop convention instead with StartInit? Protects against double starts, but also gives the client a way to check whether all its services are started.
There was a problem hiding this comment.
I think I've made the appropriate changes for this to use startstop.BaseStartStop, please take a look to see if I did so correctly!
57d4c03 to
41c1e63
Compare
@brandur the issue with I don't think there's an obvious way to extend the existing test to handle this different use case, however I will add some standalone test coverage to test the correct usage of this service. |
41c1e63 to
65d7995
Compare
|
@brandur should be good to go, please take another look 🙏 |
There was a problem hiding this comment.
@bgentry So I think the difficulty of the stress test not working is somewhat revealing of a more fundamental problem in that as written, the manager basically doesn't respond to Stop. Stop can be called, but it'll never do anything unless the subscription channel is closed separately.
What do you think about this implementation instead? It modifies things such that it'll respect a stop, but in the case of one, makes sure to clear the subscription channel before leaving, which means that it still correctly clears all events on a client shutdown. This gives us a way to use the stress test because all we need to do is close the channel in advance before calling startstoptest.Stress (the service is still a little weird compared to other because it requires that channel close, but a little less so).
Also normalizes things a bit by removing the custom Stop implementation, which most services shouldn't need.
func (sm *subscriptionManager) Start(ctx context.Context) error {
ctx, shouldStart, stopped := sm.StartInit(ctx)
if !shouldStart {
return nil
}
go func() {
// This defer should come first so that it's last out, thereby avoiding
// races.
defer close(stopped)
sm.Logger.DebugContext(ctx, sm.Name+": Run loop started")
defer sm.Logger.DebugContext(ctx, sm.Name+": Run loop stopped")
// On shutdown, close and remove all active subscriptions.
defer func() {
sm.mu.Lock()
defer sm.mu.Unlock()
for subID, sub := range sm.subscriptions {
close(sub.Chan)
delete(sm.subscriptions, subID)
}
}()
for {
select {
case <-ctx.Done():
// Distribute remaining subscriptions until the channel is
// closed. This does make the subscription manager a little
// problematic in that it requires the subscription channel to
// be closed before it will fully stop. This always happens in
// the case of a real client by virtue of the completer always
// stopping at the same time as the subscription manager, but
// one has to be careful in tests.
sm.Logger.DebugContext(ctx, sm.Name+": Stopping; distributing subscriptions until channel is closed")
for updates := range sm.subscribeCh {
sm.distributeJobUpdates(updates)
}
return
case updates := <-sm.subscribeCh:
sm.distributeJobUpdates(updates)
}
}
}()
return nil
}Here's a patchable diff including addition of an extra test case:
diff --git a/subscription_manager.go b/subscription_manager.go
index 1d74a6a..e1df69a 100644
--- a/subscription_manager.go
+++ b/subscription_manager.go
@@ -44,7 +44,7 @@ func (sm *subscriptionManager) ResetSubscribeChan(subscribeCh <-chan []jobcomple
}
func (sm *subscriptionManager) Start(ctx context.Context) error {
- _, shouldStart, stopped := sm.StartInit(ctx)
+ ctx, shouldStart, stopped := sm.StartInit(ctx)
if !shouldStart {
return nil
}
@@ -54,34 +54,44 @@ func (sm *subscriptionManager) Start(ctx context.Context) error {
// races.
defer close(stopped)
- for updates := range sm.subscribeCh {
- sm.distributeJobUpdates(updates)
- }
- }()
-
- return nil
-}
-
-func (sm *subscriptionManager) Stop() {
- shouldStop, stopped, finalizeStop := sm.StopInit()
- if !shouldStop {
- return
- }
+ sm.Logger.DebugContext(ctx, sm.Name+": Run loop started")
+ defer sm.Logger.DebugContext(ctx, sm.Name+": Run loop stopped")
- <-stopped
+ // On shutdown, close and remove all active subscriptions.
+ defer func() {
+ sm.mu.Lock()
+ defer sm.mu.Unlock()
- // Remove all subscriptions and close corresponding channels.
- func() {
- sm.mu.Lock()
- defer sm.mu.Unlock()
+ for subID, sub := range sm.subscriptions {
+ close(sub.Chan)
+ delete(sm.subscriptions, subID)
+ }
+ }()
- for subID, sub := range sm.subscriptions {
- close(sub.Chan)
- delete(sm.subscriptions, subID)
+ for {
+ select {
+ case <-ctx.Done():
+ // Distribute remaining subscriptions until the channel is
+ // closed. This does make the subscription manager a little
+ // problematic in that it requires the subscription channel to
+ // be closed before it will fully stop. This always happens in
+ // the case of a real client by virtue of the completer always
+ // stopping at the same time as the subscription manager, but
+ // one has to be careful in tests.
+ sm.Logger.DebugContext(ctx, sm.Name+": Stopping; distributing subscriptions until channel is closed")
+ for updates := range sm.subscribeCh {
+ sm.distributeJobUpdates(updates)
+ }
+
+ return
+
+ case updates := <-sm.subscribeCh:
+ sm.distributeJobUpdates(updates)
+ }
}
}()
- finalizeStop(true)
+ return nil
}
func (sm *subscriptionManager) logStats(ctx context.Context, svcName string) {
diff --git a/subscription_manager_test.go b/subscription_manager_test.go
index 5406d4a..be00500 100644
--- a/subscription_manager_test.go
+++ b/subscription_manager_test.go
@@ -9,6 +9,7 @@ import (
"github.com/riverqueue/river/internal/jobcompleter"
"github.com/riverqueue/river/internal/jobstats"
"github.com/riverqueue/river/internal/riverinternaltest"
+ "github.com/riverqueue/river/internal/riverinternaltest/startstoptest"
"github.com/riverqueue/river/internal/riverinternaltest/testfactory"
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/riverdriver"
@@ -113,7 +114,7 @@ func Test_SubscriptionManager(t *testing.T) {
subscribeCh := bundle.subscribeCh
for i := 0; i < 100; i++ {
- go func() { close(subscribeCh) }()
+ close(subscribeCh)
manager.Stop()
subscribeCh = make(chan []jobcompleter.CompleterJobUpdated, 1)
@@ -123,4 +124,16 @@ func Test_SubscriptionManager(t *testing.T) {
}
close(subscribeCh)
})
+
+ t.Run("StartStopStress", func(t *testing.T) {
+ t.Parallel()
+
+ svc, bundle := setup(t)
+
+ // Close the subscription channel in advance so that stops can leave
+ // successfully.
+ close(bundle.subscribeCh)
+
+ startstoptest.Stress(ctx, t, svc)
+ })
}
subscription_manager_test.go
Outdated
|
|
||
| subscribeCh := bundle.subscribeCh | ||
| for i := 0; i < 100; i++ { | ||
| go func() { close(subscribeCh) }() |
There was a problem hiding this comment.
This is racy because the goroutine can run after subscribeCh is reset below, which will result in a panic on double-channel close.
|
@brandur it's possible I'm misunderstanding it, but from what I'm seeing your proposal doesn't guarantee that the subscription manager will wait for all events prior to shutting down. That was the whole purpose of the channel and it was the reason that the subscription manager shutdown was previously a separate step after the services were shut down: being able to know that there are no more events coming, and therefore if we start closing client subscriptions they will not miss a single event (even after That particular property is one I think we need to preserve, as some of the upcoming stuff needs to be able to ensure that it has consumed and handled all incoming events for all processed jobs prior to stopping. Might be easier to discuss offline? |
This change extracts the Client subscriptions logic into a separate `startstop.Service` which can be started and stopped along with the other services. The important change that enables this is switching from a _callback_ for job events to a _channel_ for job events. The channel is passed to the completer during init, and the completer then owns it as the sole sender. When the completer is stopped, it must close the channel to indicate that there are no more job completion events to be processed. This moves us closer to having all the key client services be able to be managed as a single pool of services, and they can all have their shutdown initiated in parallel. Importantly, this paves the way for additional services to be added (even by external libraries) without needing to deal with more complex startup & shutdown ordering scenarios. In order to make this work with a client that can be started and stopped repeatedly, a new `ResetSubscribeChan` method was added to the `JobCompleter` interface to be called at the beginning of each `Client.Start()` call.
Respect a stop, but in the case of one, makes sure to clear the subscription channel before leaving, which means that it still correctly clears all events on a client shutdown. This gives us a way to use the stress test because all we need to do is close the channel in advance before calling startstoptest.Stress (the service is still a little weird compared to other because it requires that channel close, but a little less so). Also normalizes things a bit by removing the custom Stop implementation, which most services shouldn't need. Co-Authored-By: Brandur Leach <brandur@brandur.org>
65d7995 to
01eb6e1
Compare
|
@brandur I definitely misread the patch on the first pass and it in fact looks good 🙏 I've applied it here. |
Back in #258 / 702d5b2, the batch completer was added to improve throughput. As part of that refactor, it was turned into a startstop service that took a context on start. We took the care to ensure that the context provided to the completer was _not_ the `fetchCtx` (cancelled on `Stop()`) but instead was the raw user-provided `ctx`, specifically to make sure the completer could finish its work even after fetches were stopped. This worked well if the whole shutdown process was done with `Stop` / `StopAndCancel`, but it did not work if the user-provided context was itself cancelled outside of River. In that scenario, the completer would immediately begin shutting down upon cancellation, even without waiting for producers to finish sending it any final jobs that needed to be recorded. This went unnoticed until #379 / 0e57338 turned this scenario into a panic instead of a silent misbehavior, which is what was encountered in #400. To fix this situation, we need to use Go 1.21's new `context.WithoutCancel` API to fork the user-provided context so that we maintain whatever else is stored in there (i.e. so anything used by slog is still available) but we do not cancel this completer's context _ever_. The completer will manage its own shutdown when its `Stop()` is called as part of all of the other client services being stopped in parallel.
Back in #258 / 702d5b2, the batch completer was added to improve throughput. As part of that refactor, it was turned into a startstop service that took a context on start. We took the care to ensure that the context provided to the completer was _not_ the `fetchCtx` (cancelled on `Stop()`) but instead was the raw user-provided `ctx`, specifically to make sure the completer could finish its work even after fetches were stopped. This worked well if the whole shutdown process was done with `Stop` / `StopAndCancel`, but it did not work if the user-provided context was itself cancelled outside of River. In that scenario, the completer would immediately begin shutting down upon cancellation, even without waiting for producers to finish sending it any final jobs that needed to be recorded. This went unnoticed until #379 / 0e57338 turned this scenario into a panic instead of a silent misbehavior, which is what was encountered in #400. To fix this situation, we need to use Go 1.21's new `context.WithoutCancel` API to fork the user-provided context so that we maintain whatever else is stored in there (i.e. so anything used by slog is still available) but we do not cancel this completer's context _ever_. The completer will manage its own shutdown when its `Stop()` is called as part of all of the other client services being stopped in parallel.
Back in #258 / 702d5b2, the batch completer was added to improve throughput. As part of that refactor, it was turned into a startstop service that took a context on start. We took the care to ensure that the context provided to the completer was _not_ the `fetchCtx` (cancelled on `Stop()`) but instead was the raw user-provided `ctx`, specifically to make sure the completer could finish its work even after fetches were stopped. This worked well if the whole shutdown process was done with `Stop` / `StopAndCancel`, but it did not work if the user-provided context was itself cancelled outside of River. In that scenario, the completer would immediately begin shutting down upon cancellation, even without waiting for producers to finish sending it any final jobs that needed to be recorded. This went unnoticed until #379 / 0e57338 turned this scenario into a panic instead of a silent misbehavior, which is what was encountered in #400. To fix this situation, we need to use Go 1.21's new `context.WithoutCancel` API to fork the user-provided context so that we maintain whatever else is stored in there (i.e. so anything used by slog is still available) but we do not cancel this completer's context _ever_. The completer will manage its own shutdown when its `Stop()` is called as part of all of the other client services being stopped in parallel.
* add failing test case for completer shutdown panic This failing test case exposes the issue in #400 100% of the time, which is caused by the `stopProducers()` call not actually waiting until the producers are fully shut down before proceeding with the remaining shutdown. * fix shutdown panics by separating completer context Back in #258 / 702d5b2, the batch completer was added to improve throughput. As part of that refactor, it was turned into a startstop service that took a context on start. We took the care to ensure that the context provided to the completer was _not_ the `fetchCtx` (cancelled on `Stop()`) but instead was the raw user-provided `ctx`, specifically to make sure the completer could finish its work even after fetches were stopped. This worked well if the whole shutdown process was done with `Stop` / `StopAndCancel`, but it did not work if the user-provided context was itself cancelled outside of River. In that scenario, the completer would immediately begin shutting down upon cancellation, even without waiting for producers to finish sending it any final jobs that needed to be recorded. This went unnoticed until #379 / 0e57338 turned this scenario into a panic instead of a silent misbehavior, which is what was encountered in #400. To fix this situation, we need to use Go 1.21's new `context.WithoutCancel` API to fork the user-provided context so that we maintain whatever else is stored in there (i.e. so anything used by slog is still available) but we do not cancel this completer's context _ever_. The completer will manage its own shutdown when its `Stop()` is called as part of all of the other client services being stopped in parallel. Fixes #400.
This change extracts the Client subscriptions logic into a separate
startstop.Servicewhich can be started and stopped along with the other services. The important change that enables this is switching from a callback for job events to a channel for job events. The channel is passed to the completer during init, and the completer then owns it as the sole sender. When the completer is stopped, it must close the channel to indicate that there are no more job completion events to be processed.This moves us closer to having all the key client services be able to be managed as a single pool of services, and they can all have their shutdown initiated in parallel (even though some must still wait for others to shut down first). Importantly, this paves the way for additional services to be added (even by external libraries) without needing to deal with more complex startup & shutdown ordering scenarios. I leveraged the
startstop.StartStopBasetype in all the completers to make this work cleanly.Additionally, I updated the payload type for completer callbacks/notifications. Rather than sending a single job each time, they can now send slices of events. This is great for the batch completer, because it doesn't have to deal with mutexes or channel sends for every individual job it completed—instead it can send the whole batch through at once.
Currently based on #377.