-
Notifications
You must be signed in to change notification settings - Fork 656
Adding a new Deallocator component
#2759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -122,6 +122,16 @@ message Service { | |
| // UpdateStatus contains the status of an update, if one is in | ||
| // progress. | ||
| UpdateStatus update_status = 5; | ||
|
|
||
| // PendingDelete indicates that this service's deletion has been requested. | ||
| // Services, as well as all service-level resources, can only be deleted | ||
| // after all of the service's containers have properly shut down. | ||
| // When a user requests a deletion, we just flip this flag | ||
| // the deallocator will take it from there - it will start monitoring | ||
| // this service's tasks, and proceed to delete the service itself (and | ||
| // potentially its associated resources also marked for deletion) when | ||
| // all of its tasks are gone | ||
| bool pending_delete = 7; | ||
| } | ||
|
|
||
| // Endpoint specified all the network parameters required to | ||
|
|
@@ -292,6 +302,20 @@ message Network { | |
| // Runtime state of IPAM options. This may not reflect the | ||
| // ipam options from NetworkSpec. | ||
| IPAMOptions ipam = 5 [(gogoproto.customname) = "IPAM"]; | ||
|
|
||
| // PendingDelete indicates that this network's deletion has been requested. | ||
| // Services, as well as all service-level resources, can only be deleted | ||
| // after all the service's containers have properly shut down | ||
| // when a user requests a deletion, we just flip this flag | ||
| // the deallocator will take it from there | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| // PendingDelete indicates that this network's deletion has been requested. | ||
| // Services, as well as all service-level resources, can only be deleted | ||
| // after all of the service's containers have properly shut down. | ||
| // When a user requests a deletion of this network, we just flip this flag | ||
| // the deallocator will take it from there - it will start monitoring | ||
| // the services that still use this service, and proceed to delete | ||
| // this network when all of these services are gone | ||
| bool pending_delete = 6; | ||
|
dperny marked this conversation as resolved.
|
||
| } | ||
|
|
||
| // Cluster provides global cluster settings. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,291 @@ | ||
| package deallocator | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "github.com/docker/go-events" | ||
| "github.com/docker/swarmkit/api" | ||
| "github.com/docker/swarmkit/log" | ||
| "github.com/docker/swarmkit/manager/state/store" | ||
| ) | ||
|
|
||
| // Deallocator waits for services to fully shutdown (ie no containers left) | ||
| // and then proceeds to deallocate service-level resources (e.g. networks), | ||
| // and finally services themselves | ||
| // in particular, the Deallocator should be the only place where services, or | ||
| // service-level resources, are ever deleted! | ||
| // | ||
| // It’s worth noting that this new component’s role is quite different from | ||
| // the task reaper’s: tasks are purely internal to Swarmkit, and their status | ||
| // is entirely managed by the system itself. In contrast, the deallocator is | ||
| // responsible for safely deleting entities that are directly controlled by the | ||
| // user. | ||
|
dperny marked this conversation as resolved.
|
||
| // | ||
| // NOTE: since networks are the only service-level resources as of now, | ||
| // it has been deemed over-engineered to have a generic way to | ||
| // handle other types of service-level resources; if we ever start | ||
| // having more of those and thus want to reconsider this choice, it | ||
| // might be worth having a look at this archived branch, that does | ||
| // implement a way of separating the code for the deallocator itself | ||
| // from each resource-speficic way of handling it | ||
| // https://github.com/docker/swarmkit/compare/a84c01f49091167dd086c26b45dc18b38d52e4d9...wk8:wk8/generic_deallocator#diff-75f4f75eee6a6a7a7268c672203ea0ac | ||
| type Deallocator struct { | ||
| store *store.MemoryStore | ||
|
|
||
| // for services that are shutting down, we keep track of how many | ||
| // tasks still exist for them | ||
| services map[string]*serviceWithTaskCounts | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why map service IDs to both the service object AND the count? what does keeping a pointer to the service object get us, as opposed to having a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do need the service itself at deallocation time. Saves a lookup. I guess we could do away with that, but seems like a pretty minor choice either way?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @anshulpundir : isn't saving a lookup a strong enough reason?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One extra lookup for each service delete is not that big a overhead. But I guess you could argue either way. I'll let you make the call. |
||
|
|
||
| // mainly used for tests, so that we can peek | ||
| // into the DB state in between events | ||
| // the bool notifies whether any DB update was actually performed | ||
| eventChan chan bool | ||
|
|
||
| stopChan chan struct{} | ||
| doneChan chan struct{} | ||
| } | ||
|
|
||
| // used in our internal state's `services` right above | ||
| type serviceWithTaskCounts struct { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a comment.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| service *api.Service | ||
| taskCount int | ||
| } | ||
|
|
||
| // New creates a new deallocator | ||
| func New(store *store.MemoryStore) *Deallocator { | ||
| return &Deallocator{ | ||
| store: store, | ||
| services: make(map[string]*serviceWithTaskCounts), | ||
|
|
||
| stopChan: make(chan struct{}), | ||
| doneChan: make(chan struct{}), | ||
| } | ||
| } | ||
|
|
||
| // Run starts the deallocator, which then starts cleaning up services | ||
| // and their resources when relevant (ie when no tasks still exist | ||
| // for a given service) | ||
| // This is a blocking function | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thx for calling this out! |
||
| func (deallocator *Deallocator) Run(ctx context.Context) error { | ||
| var ( | ||
| allServices []*api.Service | ||
| allNetworks []*api.Network | ||
| ) | ||
|
|
||
| eventsChan, _, err := store.ViewAndWatch(deallocator.store, | ||
| func(readTx store.ReadTx) (err error) { | ||
| // look for services that are marked for deletion | ||
| // there's no index on the `PendingDelete` field in the store, | ||
| // so we just iterate over all of them and filter manually | ||
| // this is okay since we only do this at leadership change | ||
| allServices, err = store.FindServices(readTx, store.All) | ||
|
|
||
| if err != nil { | ||
| log.G(ctx).WithError(err).Error("failed to list services in deallocator init") | ||
| return err | ||
| } | ||
|
|
||
| // now we also need to look at all existing service-level networks | ||
| // that may be marked for deletion | ||
| if allNetworks, err = store.FindNetworks(readTx, store.All); err != nil { | ||
| log.G(ctx).WithError(err).Error("failed to list networks in deallocator init") | ||
| return err | ||
| } | ||
|
|
||
| return | ||
| }, | ||
| api.EventDeleteTask{}, | ||
| api.EventUpdateService{}, | ||
| api.EventUpdateNetwork{}) | ||
|
|
||
| if err != nil { | ||
| // if we have an error here, we can't proceed any further | ||
| log.G(ctx).WithError(err).Error("failed to initialize the deallocator") | ||
| return err | ||
| } | ||
|
|
||
| defer func() { | ||
| // eventsChanCancel() | ||
| close(deallocator.doneChan) | ||
| }() | ||
|
|
||
| anyUpdated := false | ||
| // now let's populate our internal taskCounts | ||
| for _, service := range allServices { | ||
| if updated, _ := deallocator.processService(ctx, service); updated { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add a comment on what processService() does
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| anyUpdated = true | ||
| } | ||
| } | ||
|
|
||
| // and deallocate networks that may be marked for deletion and aren't used any more | ||
| for _, network := range allNetworks { | ||
| if updated, _ := deallocator.processNetwork(ctx, nil, network, nil); updated { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do errors need to be handled or just ignored?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #2759 (comment)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should errors be at least logged?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #2759 (comment) |
||
| anyUpdated = true | ||
| } | ||
| } | ||
|
|
||
| // now we just need to wait for events | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what events are we waiting for?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The ones that were listed as args to |
||
| deallocator.notifyEventChan(anyUpdated) | ||
| for { | ||
| select { | ||
| case event := <-eventsChan: | ||
| if updated, err := deallocator.processNewEvent(ctx, event); err == nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a comment on what processNewEvent() returns and how its used.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| deallocator.notifyEventChan(updated) | ||
| } else { | ||
| log.G(ctx).WithError(err).Errorf("error processing deallocator event %#v", event) | ||
| } | ||
| case <-deallocator.stopChan: | ||
| return nil | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| } | ||
| } | ||
|
dperny marked this conversation as resolved.
|
||
| } | ||
|
|
||
| // Stop stops the deallocator's routine | ||
| // FIXME (jrouge): see the comment on TaskReaper.Stop() and see when to properly stop this | ||
| // plus unit test on this! | ||
| func (deallocator *Deallocator) Stop() { | ||
| close(deallocator.stopChan) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you want to wrap this in a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll do that when I get around to actually using it, right now I'm not clear on this. That's what the comment right before it hints at. But good catch, keeping it in mind :) |
||
| <-deallocator.doneChan | ||
| } | ||
|
|
||
| // always a bno-op, except when running tests tests | ||
| // see the comment about `Deallocator`s' `eventChan` field | ||
| func (deallocator *Deallocator) notifyEventChan(updated bool) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add a comment.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| if deallocator.eventChan != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this ever be called where the deallocator.eventChan is nil?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Added comments. |
||
| deallocator.eventChan <- updated | ||
| } | ||
| } | ||
|
|
||
| // if a service is marked for deletion, this checks whether it's ready to be | ||
| // deleted yet, and does it if relevant | ||
| func (deallocator *Deallocator) processService(ctx context.Context, service *api.Service) (bool, error) { | ||
|
dperny marked this conversation as resolved.
|
||
| if !service.PendingDelete { | ||
| return false, nil | ||
| } | ||
|
|
||
| var ( | ||
| tasks []*api.Task | ||
| err error | ||
| ) | ||
|
|
||
| deallocator.store.View(func(tx store.ReadTx) { | ||
| tasks, err = store.FindTasks(tx, store.ByServiceID(service.ID)) | ||
| }) | ||
|
|
||
| if err != nil { | ||
| log.G(ctx).WithError(err).Errorf("failed to retrieve the list of tasks for service %v", service.ID) | ||
| // if in doubt, let's proceed to clean up the service anyway | ||
| // better to clean up resources that shouldn't be cleaned up yet | ||
| // than ending up with a service and some resources lost in limbo forever | ||
| return true, deallocator.deallocateService(ctx, service) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is a good idea, especially because deletion can't be reversed. Also, I think its better to have a leaked service object than corruption (tasks running without a service object). What do you think?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The whole point of this patch is to start asynchronously deleting services and their resources. That means that if errors occur when actually deleting, we can't just surface those to the user who requested the deletion in the first place, since their request have long since returned. That leaves us with just a few options in my opinion:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see your point with 2 that this behavior is the same as the previous behavior. Looks good as you have it currently.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| } else if len(tasks) == 0 { | ||
| // no tasks remaining for this service, we can clean it up | ||
| return true, deallocator.deallocateService(ctx, service) | ||
| } | ||
| deallocator.services[service.ID] = &serviceWithTaskCounts{service: service, taskCount: len(tasks)} | ||
| return false, nil | ||
| } | ||
|
|
||
| func (deallocator *Deallocator) deallocateService(ctx context.Context, service *api.Service) (err error) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a comment.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| err = deallocator.store.Update(func(tx store.Tx) error { | ||
| // first, let's delete the service | ||
| var ignoreServiceID *string | ||
| if err := store.DeleteService(tx, service.ID); err != nil { | ||
| // all errors are just for logging here, we do a best effort at cleaning up everything we can | ||
| log.G(ctx).WithError(err).Errorf("failed to delete service record ID %v", service.ID) | ||
| ignoreServiceID = &service.ID | ||
| } | ||
|
|
||
| // then all of its networks, provided no other service uses them | ||
| spec := service.Spec | ||
| // see https://github.com/docker/swarmkit/blob/e2aafdd3453d2ab103dd97364f79ea6b857f9446/api/specs.proto#L80-L84 | ||
| // we really should have a helper function on services to do this... | ||
| networkConfigs := spec.Task.Networks | ||
| if len(networkConfigs) == 0 { | ||
| networkConfigs = spec.Networks | ||
| } | ||
| for _, networkConfig := range networkConfigs { | ||
| if network := store.GetNetwork(tx, networkConfig.Target); network != nil { | ||
| deallocator.processNetwork(ctx, tx, network, ignoreServiceID) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you handle the return value from this function, failures etc?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #2759 (comment)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should errors be at least logged?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both |
||
| } | ||
| } | ||
|
|
||
| return nil | ||
| }) | ||
|
|
||
| if err != nil { | ||
| log.G(ctx).WithError(err).Errorf("DB error when deallocating service %v", service.ID) | ||
| } | ||
| return | ||
| } | ||
|
|
||
| // proceeds to deallocate a network if it's pending deletion and there no | ||
| // longer are any services using it | ||
| // actually deletes the network if it's marked for deletion and no services are | ||
| // using it any more (or the only one using it has ID `ignoreServiceID`, if not | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for adding this, but I don't think its obvious to a first time reader. Why is it OK to deallocated the network when the only one using it has ID |
||
| // nil - this comes in handy when there's been an error deleting a service) | ||
| // This function can be called either when deallocating a whole service, or | ||
| // because there was an `EventUpdateNetwork` event - in the former case, the | ||
| // transaction will be that of the service deallocation, in the latter it will be nil | ||
| func (deallocator *Deallocator) processNetwork(ctx context.Context, tx store.Tx, network *api.Network, ignoreServiceID *string) (updated bool, err error) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add comments for each of the arguments accepted by the function. e.g. its not obvious what ignoreServiceID does.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍
anshulpundir marked this conversation as resolved.
|
||
| if !network.PendingDelete { | ||
| return | ||
| } | ||
|
|
||
| updateFunc := func(t store.Tx) error { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add comments for updateFunc. Also do we really need a function object?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do. This method can be called either with or without an open transaction, depending on whether it's done as part of a service cleanup or not. See how it's used in the |
||
| services, err := store.FindServices(t, store.ByReferencedNetworkID(network.ID)) | ||
|
|
||
| if err != nil { | ||
| log.G(ctx).WithError(err).Errorf("could not fetch services using network ID %v", network.ID) | ||
| return err | ||
| } | ||
|
|
||
| noMoreServices := len(services) == 0 || | ||
| len(services) == 1 && ignoreServiceID != nil && services[0].ID == *ignoreServiceID | ||
|
|
||
| if noMoreServices { | ||
| return store.DeleteNetwork(t, network.ID) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| if tx == nil { | ||
|
anshulpundir marked this conversation as resolved.
|
||
| err = deallocator.store.Update(updateFunc) | ||
| } else { | ||
| err = updateFunc(tx) | ||
| } | ||
|
|
||
| if err != nil { | ||
| log.G(ctx).WithError(err).Errorf("DB error when deallocating network ID %v", network.ID) | ||
| } | ||
| return | ||
| } | ||
|
|
||
| // Processes new events, and dispatches to the right method depending on what | ||
| // type of event it is. | ||
| // The boolean part of the return tuple indicates whether anything was actually | ||
| // removed from the store | ||
| func (deallocator *Deallocator) processNewEvent(ctx context.Context, event events.Event) (bool, error) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz add a comment. All functions should have comments unless its blatantly obvious.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| switch typedEvent := event.(type) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lol |
||
| case api.EventDeleteTask: | ||
| serviceID := typedEvent.Task.ServiceID | ||
|
|
||
| if serviceWithCount, present := deallocator.services[serviceID]; present { | ||
| if serviceWithCount.taskCount <= 1 { | ||
| delete(deallocator.services, serviceID) | ||
| return deallocator.processService(ctx, serviceWithCount.service) | ||
| } | ||
| serviceWithCount.taskCount-- | ||
| } | ||
|
|
||
| return false, nil | ||
| case api.EventUpdateService: | ||
| return deallocator.processService(ctx, typedEvent.Service) | ||
| case api.EventUpdateNetwork: | ||
| return deallocator.processNetwork(ctx, nil, typedEvent.Network, nil) | ||
| default: | ||
| return false, nil | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.