From 11cbcdbedde1fee8c609c9f5d9d26aefa33ad103 Mon Sep 17 00:00:00 2001 From: Ying Date: Mon, 24 Jul 2017 17:47:39 -0700 Subject: [PATCH] Update the swarmkit vendor to include the following changes: - https://github.com/docker/swarmkit/pull/2309 (updating the service spec version when rolling back) - https://github.com/docker/swarmkit/pull/2310 (fix for slow swarm shutdown) - https://github.com/docker/swarmkit/pull/2323 (run watchapi server on all managers) Signed-off-by: Ying --- components/engine/vendor.conf | 2 +- .../swarmkit/manager/logbroker/broker.go | 37 ++++++++++------- .../docker/swarmkit/manager/manager.go | 24 ++++++----- .../manager/orchestrator/update/updater.go | 2 + .../swarmkit/manager/watchapi/server.go | 41 ++++++++++++++++++- .../docker/swarmkit/manager/watchapi/watch.go | 9 ++++ 6 files changed, 89 insertions(+), 26 deletions(-) diff --git a/components/engine/vendor.conf b/components/engine/vendor.conf index 76641c227d9..52bbca365ad 100644 --- a/components/engine/vendor.conf +++ b/components/engine/vendor.conf @@ -108,7 +108,7 @@ github.com/containerd/containerd 6e23458c129b551d5c9871e5174f6b1b7f6d1170 https: github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit fb828cea0ec518dadea0f04900e0057e38194562 +github.com/docker/swarmkit a0a7f6f663c35c92ddcd73e2c1b97b0f4ed8caf3 github.com/gogo/protobuf v0.4 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go b/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go index f5ec2b30bd1..5eededfc055 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go @@ -57,12 +57,12 @@ func New(store *store.MemoryStore) *LogBroker { } } -// Run the log broker -func (lb *LogBroker) Run(ctx context.Context) error { +// Start starts the log broker +func (lb *LogBroker) Start(ctx context.Context) error { lb.mu.Lock() + defer lb.mu.Unlock() if lb.cancelAll != nil { - lb.mu.Unlock() return errAlreadyRunning } @@ -71,12 +71,7 @@ func (lb *LogBroker) Run(ctx context.Context) error { lb.subscriptionQueue = watch.NewQueue() lb.registeredSubscriptions = make(map[string]*subscription) lb.subscriptionsByNode = make(map[string]map[*subscription]struct{}) - lb.mu.Unlock() - - select { - case <-lb.pctx.Done(): - return lb.pctx.Err() - } + return nil } // Stop stops the log broker @@ -234,8 +229,15 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api return err } + lb.mu.Lock() + pctx := lb.pctx + lb.mu.Unlock() + if pctx == nil { + return errNotRunning + } + subscription := lb.newSubscription(request.Selector, request.Options) - subscription.Run(lb.pctx) + subscription.Run(pctx) defer subscription.Stop() log := log.G(ctx).WithFields( @@ -257,8 +259,8 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api select { case <-ctx.Done(): return ctx.Err() - case <-lb.pctx.Done(): - return lb.pctx.Err() + case <-pctx.Done(): + return pctx.Err() case event := <-publishCh: publish := event.(*logMessage) if publish.completed { @@ -308,6 +310,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest return err } + lb.mu.Lock() + pctx := lb.pctx + lb.mu.Unlock() + if pctx == nil { + return errNotRunning + } + lb.nodeConnected(remote.NodeID) defer lb.nodeDisconnected(remote.NodeID) @@ -329,7 +338,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest select { case <-stream.Context().Done(): return stream.Context().Err() - case <-lb.pctx.Done(): + case <-pctx.Done(): return nil default: } @@ -362,7 +371,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest } case <-stream.Context().Done(): return stream.Context().Err() - case <-lb.pctx.Done(): + case <-pctx.Done(): return nil } } diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/manager.go b/components/engine/vendor/github.com/docker/swarmkit/manager/manager.go index 311e63da6ca..b8d7383f8ea 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/manager.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/manager.go @@ -129,6 +129,7 @@ type Manager struct { caserver *ca.Server dispatcher *dispatcher.Dispatcher logbroker *logbroker.LogBroker + watchServer *watchapi.Server replicatedOrchestrator *replicated.Orchestrator globalOrchestrator *global.Orchestrator taskReaper *taskreaper.TaskReaper @@ -220,6 +221,7 @@ func New(config *Config) (*Manager, error) { caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths), dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()), logbroker: logbroker.New(raftNode.MemoryStore()), + watchServer: watchapi.NewServer(raftNode.MemoryStore()), server: grpc.NewServer(opts...), localserver: grpc.NewServer(opts...), raftNode: raftNode, @@ -397,13 +399,12 @@ func (m *Manager) Run(parent context.Context) error { } baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.caserver, m.config.PluginGetter) - baseWatchAPI := watchapi.NewServer(m.raftNode.MemoryStore()) baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore()) healthServer := health.NewHealthServer() localHealthServer := health.NewHealthServer() authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize) - authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(baseWatchAPI, authorize) + authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(m.watchServer, authorize) authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize) authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize) authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize) @@ -476,7 +477,7 @@ func (m *Manager) Run(parent context.Context) error { grpc_prometheus.Register(m.server) api.RegisterControlServer(m.localserver, localProxyControlAPI) - api.RegisterWatchServer(m.localserver, baseWatchAPI) + api.RegisterWatchServer(m.localserver, m.watchServer) api.RegisterLogsServer(m.localserver, localProxyLogsAPI) api.RegisterHealthServer(m.localserver, localHealthServer) api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI) @@ -489,6 +490,10 @@ func (m *Manager) Run(parent context.Context) error { healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING) localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING) + if err := m.watchServer.Start(ctx); err != nil { + log.G(ctx).WithError(err).Error("watch server failed to start") + } + go m.serveListener(ctx, m.remoteListener) go m.serveListener(ctx, m.controlListener) @@ -564,8 +569,8 @@ func (m *Manager) Run(parent context.Context) error { const stopTimeout = 8 * time.Second // Stop stops the manager. It immediately closes all open connections and -// active RPCs as well as stopping the scheduler. If clearData is set, the -// raft logs, snapshots, and keys will be erased. +// active RPCs as well as stopping the manager's subsystems. If clearData is +// set, the raft logs, snapshots, and keys will be erased. func (m *Manager) Stop(ctx context.Context, clearData bool) { log.G(ctx).Info("Stopping manager") // It's not safe to start shutting down while the manager is still @@ -599,6 +604,7 @@ func (m *Manager) Stop(ctx context.Context, clearData bool) { m.dispatcher.Stop() m.logbroker.Stop() + m.watchServer.Stop() m.caserver.Stop() if m.allocator != nil { @@ -1000,11 +1006,9 @@ func (m *Manager) becomeLeader(ctx context.Context) { } }(m.dispatcher) - go func(lb *logbroker.LogBroker) { - if err := lb.Run(ctx); err != nil { - log.G(ctx).WithError(err).Error("LogBroker exited with an error") - } - }(m.logbroker) + if err := m.logbroker.Start(ctx); err != nil { + log.G(ctx).WithError(err).Error("LogBroker failed to start") + } go func(server *ca.Server) { if err := server.Run(ctx); err != nil { diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go index 2b1f55d3f2f..41cccf837ae 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go @@ -601,7 +601,9 @@ func (u *Updater) rollbackUpdate(ctx context.Context, serviceID, message string) return errors.New("cannot roll back service because no previous spec is available") } service.Spec = *service.PreviousSpec + service.SpecVersion = service.PreviousSpecVersion.Copy() service.PreviousSpec = nil + service.PreviousSpecVersion = nil return store.UpdateService(tx, service) }) diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/server.go b/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/server.go index 07cdedbb36c..6d49dca715d 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/server.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/server.go @@ -1,12 +1,24 @@ package watchapi import ( + "errors" + "sync" + "github.com/docker/swarmkit/manager/state/store" + "golang.org/x/net/context" +) + +var ( + errAlreadyRunning = errors.New("broker is already running") + errNotRunning = errors.New("broker is not running") ) // Server is the store API gRPC server. type Server struct { - store *store.MemoryStore + store *store.MemoryStore + mu sync.Mutex + pctx context.Context + cancelAll func() } // NewServer creates a store API server. @@ -15,3 +27,30 @@ func NewServer(store *store.MemoryStore) *Server { store: store, } } + +// Start starts the watch server. +func (s *Server) Start(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.cancelAll != nil { + return errAlreadyRunning + } + + s.pctx, s.cancelAll = context.WithCancel(ctx) + return nil +} + +// Stop stops the watch server. +func (s *Server) Stop() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.cancelAll == nil { + return errNotRunning + } + s.cancelAll() + s.cancelAll = nil + + return nil +} diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/watch.go b/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/watch.go index 555b8997439..53bed49f1cb 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/watch.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/watchapi/watch.go @@ -17,6 +17,13 @@ import ( func (s *Server) Watch(request *api.WatchRequest, stream api.Watch_WatchServer) error { ctx := stream.Context() + s.mu.Lock() + pctx := s.pctx + s.mu.Unlock() + if pctx == nil { + return errNotRunning + } + watchArgs, err := api.ConvertWatchArgs(request.Entries) if err != nil { return grpc.Errorf(codes.InvalidArgument, "%s", err.Error()) @@ -39,6 +46,8 @@ func (s *Server) Watch(request *api.WatchRequest, stream api.Watch_WatchServer) select { case <-ctx.Done(): return ctx.Err() + case <-pctx.Done(): + return pctx.Err() case event := <-watch: if commitEvent, ok := event.(state.EventCommit); ok && len(events) > 0 { if err := stream.Send(&api.WatchMessage{Events: events, Version: commitEvent.Version}); err != nil {