From 1dd11b3bb69eee76e261dfb82a67eafbf99067d3 Mon Sep 17 00:00:00 2001 From: Ying Date: Mon, 24 Jul 2017 18:01:58 -0700 Subject: [PATCH] Re-vendors swarmkit to include the following fixes: - https://github.com/docker/swarmkit/pull/2288 (Allow updates of failed services with restart policy "none") - https://github.com/docker/swarmkit/pull/2304 (Reset restart history when task spec changes) - https://github.com/docker/swarmkit/pull/2309 (updating the service spec version when rolling back) - https://github.com/docker/swarmkit/pull/2310 (fix for slow swarm shutdown) Signed-off-by: Ying --- components/engine/vendor.conf | 4 +- .../swarmkit/manager/logbroker/broker.go | 37 ++++++++----- .../docker/swarmkit/manager/manager.go | 20 ++++--- .../manager/orchestrator/global/global.go | 33 +----------- .../manager/orchestrator/restart/restart.go | 52 ++++++++++--------- .../manager/orchestrator/update/updater.go | 2 + .../swarmkit/manager/watchapi/server.go | 41 ++++++++++++++- .../docker/swarmkit/manager/watchapi/watch.go | 9 ++++ 8 files changed, 118 insertions(+), 80 deletions(-) diff --git a/components/engine/vendor.conf b/components/engine/vendor.conf index 46934bae28a..342b826344e 100644 --- a/components/engine/vendor.conf +++ b/components/engine/vendor.conf @@ -106,7 +106,7 @@ github.com/stevvooe/continuity cd7a8e21e2b6f84799f5dd4b65faf49c8d3ee02d github.com/tonistiigi/fsutil 0ac4c11b053b9c5c7c47558f81f96c7100ce50fb # cluster -github.com/docker/swarmkit 3e2dd3c0a76149b1620b42d28dd6ff48270404e5 +github.com/docker/swarmkit 069d13ff72a214cdd7a06821299987b28ee2a627 github.com/gogo/protobuf v0.4 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e @@ -143,4 +143,4 @@ github.com/opencontainers/selinux v1.0.0-rc1 # git --git-dir ./go/.git --work-tree ./go checkout revert-prefix-ignore # cp -a go/src/archive/tar ./vendor/archive/tar # rm -rf ./go -# vndr \ No newline at end of file +# vndr diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go b/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go index f5ec2b30bd1..5eededfc055 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go @@ -57,12 +57,12 @@ func New(store *store.MemoryStore) *LogBroker { } } -// Run the log broker -func (lb *LogBroker) Run(ctx context.Context) error { +// Start starts the log broker +func (lb *LogBroker) Start(ctx context.Context) error { lb.mu.Lock() + defer lb.mu.Unlock() if lb.cancelAll != nil { - lb.mu.Unlock() return errAlreadyRunning } @@ -71,12 +71,7 @@ func (lb *LogBroker) Run(ctx context.Context) error { lb.subscriptionQueue = watch.NewQueue() lb.registeredSubscriptions = make(map[string]*subscription) lb.subscriptionsByNode = make(map[string]map[*subscription]struct{}) - lb.mu.Unlock() - - select { - case <-lb.pctx.Done(): - return lb.pctx.Err() - } + return nil } // Stop stops the log broker @@ -234,8 +229,15 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api return err } + lb.mu.Lock() + pctx := lb.pctx + lb.mu.Unlock() + if pctx == nil { + return errNotRunning + } + subscription := lb.newSubscription(request.Selector, request.Options) - subscription.Run(lb.pctx) + subscription.Run(pctx) defer subscription.Stop() log := log.G(ctx).WithFields( @@ -257,8 +259,8 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api select { case <-ctx.Done(): return ctx.Err() - case <-lb.pctx.Done(): - return lb.pctx.Err() + case <-pctx.Done(): + return pctx.Err() case event := <-publishCh: publish := event.(*logMessage) if publish.completed { @@ -308,6 +310,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest return err } + lb.mu.Lock() + pctx := lb.pctx + lb.mu.Unlock() + if pctx == nil { + return errNotRunning + } + lb.nodeConnected(remote.NodeID) defer lb.nodeDisconnected(remote.NodeID) @@ -329,7 +338,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest select { case <-stream.Context().Done(): return stream.Context().Err() - case <-lb.pctx.Done(): + case <-pctx.Done(): return nil default: } @@ -362,7 +371,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest } case <-stream.Context().Done(): return stream.Context().Err() - case <-lb.pctx.Done(): + case <-pctx.Done(): return nil } } diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/manager.go b/components/engine/vendor/github.com/docker/swarmkit/manager/manager.go index 4771c9dcbe8..3fb0518b535 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/manager.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/manager.go @@ -130,6 +130,7 @@ type Manager struct { caserver *ca.Server dispatcher *dispatcher.Dispatcher logbroker *logbroker.LogBroker + watchServer *watchapi.Server replicatedOrchestrator *replicated.Orchestrator globalOrchestrator *global.Orchestrator taskReaper *taskreaper.TaskReaper @@ -221,6 +222,7 @@ func New(config *Config) (*Manager, error) { caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths), dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig(), drivers.New(config.PluginGetter)), logbroker: logbroker.New(raftNode.MemoryStore()), + watchServer: watchapi.NewServer(raftNode.MemoryStore()), server: grpc.NewServer(opts...), localserver: grpc.NewServer(opts...), raftNode: raftNode, @@ -398,13 +400,12 @@ func (m *Manager) Run(parent context.Context) error { } baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.caserver, m.config.PluginGetter) - baseWatchAPI := watchapi.NewServer(m.raftNode.MemoryStore()) baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore()) healthServer := health.NewHealthServer() localHealthServer := health.NewHealthServer() authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize) - authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(baseWatchAPI, authorize) + authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(m.watchServer, authorize) authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize) authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize) authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize) @@ -477,7 +478,7 @@ func (m *Manager) Run(parent context.Context) error { grpc_prometheus.Register(m.server) api.RegisterControlServer(m.localserver, localProxyControlAPI) - api.RegisterWatchServer(m.localserver, baseWatchAPI) + api.RegisterWatchServer(m.localserver, m.watchServer) api.RegisterLogsServer(m.localserver, localProxyLogsAPI) api.RegisterHealthServer(m.localserver, localHealthServer) api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI) @@ -1001,11 +1002,13 @@ func (m *Manager) becomeLeader(ctx context.Context) { } }(m.dispatcher) - go func(lb *logbroker.LogBroker) { - if err := lb.Run(ctx); err != nil { - log.G(ctx).WithError(err).Error("LogBroker exited with an error") - } - }(m.logbroker) + if err := m.logbroker.Start(ctx); err != nil { + log.G(ctx).WithError(err).Error("LogBroker failed to start") + } + + if err := m.watchServer.Start(ctx); err != nil { + log.G(ctx).WithError(err).Error("watch server failed to start") + } go func(server *ca.Server) { if err := server.Run(ctx); err != nil { @@ -1059,6 +1062,7 @@ func (m *Manager) becomeLeader(ctx context.Context) { func (m *Manager) becomeFollower() { m.dispatcher.Stop() m.logbroker.Stop() + m.watchServer.Stop() m.caserver.Stop() if m.allocator != nil { diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go index b89a105fec1..ee4c2baea3b 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go @@ -169,12 +169,6 @@ func (g *Orchestrator) Run(ctx context.Context) error { delete(g.nodes, v.Node.ID) case api.EventUpdateTask: g.handleTaskChange(ctx, v.Task) - case api.EventDeleteTask: - // CLI allows deleting task - if _, exists := g.globalServices[v.Task.ServiceID]; !exists { - continue - } - g.reconcileServicesOneNode(ctx, []string{v.Task.ServiceID}, v.Task.NodeID) } case <-g.stopChan: return nil @@ -216,7 +210,7 @@ func (g *Orchestrator) handleTaskChange(ctx context.Context, t *api.Task) { if _, exists := g.globalServices[t.ServiceID]; !exists { return } - // if a task's DesiredState has past running, which + // if a task's DesiredState has passed running, it // means the task has been processed if t.DesiredState > api.TaskStateRunning { return @@ -264,7 +258,6 @@ func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node, } func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []string) { - nodeCompleted := make(map[string]map[string]struct{}) nodeTasks := make(map[string]map[string][]*api.Task) g.store.View(func(tx store.ReadTx) { @@ -275,8 +268,6 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin continue } - // a node may have completed this service - nodeCompleted[serviceID] = make(map[string]struct{}) // nodeID -> task list nodeTasks[serviceID] = make(map[string][]*api.Task) @@ -284,11 +275,6 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin if t.DesiredState <= api.TaskStateRunning { // Collect all running instances of this service nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t) - } else { - // for finished tasks, check restartPolicy - if isTaskCompleted(t, orchestrator.RestartCondition(t)) { - nodeCompleted[serviceID][t.NodeID] = struct{}{} - } } } } @@ -311,9 +297,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin ntasks := nodeTasks[serviceID][nodeID] delete(nodeTasks[serviceID], nodeID) - // if restart policy considers this node has finished its task - // it should remove all running tasks - if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints { + if !meetsConstraints { g.shutdownTasks(ctx, batch, ntasks) continue } @@ -400,8 +384,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs return } - // whether each service has completed on the node - completed := make(map[string]bool) // tasks by service tasks := make(map[string][]*api.Task) @@ -425,10 +407,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs } if t.DesiredState <= api.TaskStateRunning { tasks[serviceID] = append(tasks[serviceID], t) - } else { - if isTaskCompleted(t, orchestrator.RestartCondition(t)) { - completed[serviceID] = true - } } } } @@ -444,13 +422,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs continue } - // if restart policy considers this node has finished its task - // it should remove all running tasks - if completed[serviceID] { - g.shutdownTasks(ctx, batch, tasks[serviceID]) - continue - } - if node.Spec.Availability == api.NodeAvailabilityPause { // the node is paused, so we won't add or update tasks continue diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/restart/restart.go b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/restart/restart.go index 6167552d4ae..eed28f8202c 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/restart/restart.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/restart/restart.go @@ -30,6 +30,13 @@ type instanceRestartInfo struct { // Restart.MaxAttempts and Restart.Window are both // nonzero. restartedInstances *list.List + // Why is specVersion in this structure and not in the map key? While + // putting it in the key would be a very simple solution, it wouldn't + // be easy to clean up map entries corresponding to old specVersions. + // Making the key version-agnostic and clearing the value whenever the + // version changes avoids the issue of stale map entries for old + // versions. + specVersion api.Version } type delayedStart struct { @@ -54,8 +61,7 @@ type Supervisor struct { mu sync.Mutex store *store.MemoryStore delays map[string]*delayedStart - history map[instanceTuple]*instanceRestartInfo - historyByService map[string]map[instanceTuple]struct{} + historyByService map[string]map[instanceTuple]*instanceRestartInfo TaskTimeout time.Duration } @@ -64,8 +70,7 @@ func NewSupervisor(store *store.MemoryStore) *Supervisor { return &Supervisor{ store: store, delays: make(map[string]*delayedStart), - history: make(map[instanceTuple]*instanceRestartInfo), - historyByService: make(map[string]map[instanceTuple]struct{}), + historyByService: make(map[string]map[instanceTuple]*instanceRestartInfo), TaskTimeout: defaultOldTaskTimeout, } } @@ -214,8 +219,8 @@ func (r *Supervisor) shouldRestart(ctx context.Context, t *api.Task, service *ap r.mu.Lock() defer r.mu.Unlock() - restartInfo := r.history[instanceTuple] - if restartInfo == nil { + restartInfo := r.historyByService[t.ServiceID][instanceTuple] + if restartInfo == nil || (t.SpecVersion != nil && *t.SpecVersion != restartInfo.specVersion) { return true } @@ -268,17 +273,26 @@ func (r *Supervisor) recordRestartHistory(restartTask *api.Task) { r.mu.Lock() defer r.mu.Unlock() - if r.history[tuple] == nil { - r.history[tuple] = &instanceRestartInfo{} + if r.historyByService[restartTask.ServiceID] == nil { + r.historyByService[restartTask.ServiceID] = make(map[instanceTuple]*instanceRestartInfo) + } + if r.historyByService[restartTask.ServiceID][tuple] == nil { + r.historyByService[restartTask.ServiceID][tuple] = &instanceRestartInfo{} } - restartInfo := r.history[tuple] - restartInfo.totalRestarts++ + restartInfo := r.historyByService[restartTask.ServiceID][tuple] - if r.historyByService[restartTask.ServiceID] == nil { - r.historyByService[restartTask.ServiceID] = make(map[instanceTuple]struct{}) + if restartTask.SpecVersion != nil && *restartTask.SpecVersion != restartInfo.specVersion { + // This task has a different SpecVersion from the one we're + // tracking. Most likely, the service was updated. Past failures + // shouldn't count against the new service definition, so clear + // the history for this instance. + *restartInfo = instanceRestartInfo{ + specVersion: *restartTask.SpecVersion, + } } - r.historyByService[restartTask.ServiceID][tuple] = struct{}{} + + restartInfo.totalRestarts++ if restartTask.Spec.Restart.Window != nil && (restartTask.Spec.Restart.Window.Seconds != 0 || restartTask.Spec.Restart.Window.Nanos != 0) { if restartInfo.restartedInstances == nil { @@ -432,16 +446,6 @@ func (r *Supervisor) CancelAll() { // ClearServiceHistory forgets restart history related to a given service ID. func (r *Supervisor) ClearServiceHistory(serviceID string) { r.mu.Lock() - defer r.mu.Unlock() - - tuples := r.historyByService[serviceID] - if tuples == nil { - return - } - delete(r.historyByService, serviceID) - - for t := range tuples { - delete(r.history, t) - } + r.mu.Unlock() } diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go index 2b1f55d3f2f..41cccf837ae 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go @@ -601,7 +601,9 @@ func (u *Updater) rollbackUpdate(ctx context.Context, serviceID, message string) return errors.New("cannot roll back service because no previous spec is available") } service.Spec = *service.PreviousSpec + service.SpecVersion = service.PreviousSpecVersion.Copy() service.PreviousSpec = nil + service.PreviousSpecVersion = nil return store.UpdateService(tx, service) }) diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/server.go b/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/server.go index 07cdedbb36c..6d49dca715d 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/server.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/server.go @@ -1,12 +1,24 @@ package watchapi import ( + "errors" + "sync" + "github.com/docker/swarmkit/manager/state/store" + "golang.org/x/net/context" +) + +var ( + errAlreadyRunning = errors.New("broker is already running") + errNotRunning = errors.New("broker is not running") ) // Server is the store API gRPC server. type Server struct { - store *store.MemoryStore + store *store.MemoryStore + mu sync.Mutex + pctx context.Context + cancelAll func() } // NewServer creates a store API server. @@ -15,3 +27,30 @@ func NewServer(store *store.MemoryStore) *Server { store: store, } } + +// Start starts the watch server. +func (s *Server) Start(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.cancelAll != nil { + return errAlreadyRunning + } + + s.pctx, s.cancelAll = context.WithCancel(ctx) + return nil +} + +// Stop stops the watch server. +func (s *Server) Stop() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.cancelAll == nil { + return errNotRunning + } + s.cancelAll() + s.cancelAll = nil + + return nil +} diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/watch.go b/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/watch.go index 555b8997439..53bed49f1cb 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/watch.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/watch.go @@ -17,6 +17,13 @@ import ( func (s *Server) Watch(request *api.WatchRequest, stream api.Watch_WatchServer) error { ctx := stream.Context() + s.mu.Lock() + pctx := s.pctx + s.mu.Unlock() + if pctx == nil { + return errNotRunning + } + watchArgs, err := api.ConvertWatchArgs(request.Entries) if err != nil { return grpc.Errorf(codes.InvalidArgument, "%s", err.Error()) @@ -39,6 +46,8 @@ func (s *Server) Watch(request *api.WatchRequest, stream api.Watch_WatchServer) select { case <-ctx.Done(): return ctx.Err() + case <-pctx.Done(): + return pctx.Err() case event := <-watch: if commitEvent, ok := event.(state.EventCommit); ok && len(events) > 0 { if err := stream.Send(&api.WatchMessage{Events: events, Version: commitEvent.Version}); err != nil {