Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
795 changes: 448 additions & 347 deletions api/types.pb.go

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions api/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,15 @@ message TaskStatus {
// HostPorts provides a list of ports allocated at the host
// level.
PortStatus port_status = 6;

// AppliedBy gives the node ID of the manager that applied this task
// status update to the Task object.
string applied_by = 7;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking question - more information is always helpful for debugging, but what is this intended to be used for?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thinking is that since we're storing a timestamp, it's useful to know what frame of reference that timestamp comes from, in case we later add logic to handle clock skew.

I'm fine with removing this for now. I suppose it could be useful for debugging, but that's not a very strong argument for having it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'd be a cool thing to have eventually - would we store historical skew data between the manager nodes for the adjustment?


// AppliedAt gives a timestamp of when this status update was applied to
// the Task object.
// Note: can't use stdtime because this field is nullable.
google.protobuf.Timestamp applied_at = 8;
}

// NetworkAttachmentConfig specifies how a service should be attached to a particular network.
Expand Down
7 changes: 6 additions & 1 deletion manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/drivers"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/watch"
gogotypes "github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -127,6 +128,7 @@ type Dispatcher struct {
cancel context.CancelFunc
clusterUpdateQueue *watch.Queue
dp *drivers.DriverProvider
securityConfig *ca.SecurityConfig

taskUpdates map[string]*api.TaskStatus // indexed by task ID
taskUpdatesLock sync.Mutex
Expand All @@ -144,7 +146,7 @@ type Dispatcher struct {
}

// New returns Dispatcher with cluster interface(usually raft.Node).
func New(cluster Cluster, c *Config, dp *drivers.DriverProvider) *Dispatcher {
func New(cluster Cluster, c *Config, dp *drivers.DriverProvider, securityConfig *ca.SecurityConfig) *Dispatcher {
d := &Dispatcher{
dp: dp,
nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod),
Expand All @@ -153,6 +155,7 @@ func New(cluster Cluster, c *Config, dp *drivers.DriverProvider) *Dispatcher {
cluster: cluster,
processUpdatesTrigger: make(chan struct{}, 1),
config: c,
securityConfig: securityConfig,
}

d.processUpdatesCond = sync.NewCond(&d.processUpdatesLock)
Expand Down Expand Up @@ -630,6 +633,8 @@ func (d *Dispatcher) processUpdates(ctx context.Context) {
}

task.Status = *status
task.Status.AppliedBy = d.securityConfig.ClientTLSCreds.NodeID()
task.Status.AppliedAt = ptypes.MustTimestampProto(time.Now())
if err := store.UpdateTask(tx, task); err != nil {
logger.WithError(err).Error("failed to update task status")
return nil
Expand Down
2 changes: 1 addition & 1 deletion manager/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func startDispatcher(c *Config) (*grpcDispatcher, error) {
s := grpc.NewServer(serverOpts...)
tc := newTestCluster(l.Addr().String(), tca.MemoryStore)
driverGetter := &mockPluginGetter{}
d := New(tc, c, drivers.New(driverGetter))
d := New(tc, c, drivers.New(driverGetter), managerSecurityConfig)

authorize := func(ctx context.Context, roles []string) error {
_, err := ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, tca.Organization, nil)
Expand Down
2 changes: 1 addition & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func New(config *Config) (*Manager, error) {
m := &Manager{
config: *config,
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths),
dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig(), drivers.New(config.PluginGetter)),
dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig(), drivers.New(config.PluginGetter), config.SecurityConfig),
logbroker: logbroker.New(raftNode.MemoryStore()),
watchServer: watchapi.NewServer(raftNode.MemoryStore()),
server: grpc.NewServer(opts...),
Expand Down
60 changes: 43 additions & 17 deletions manager/orchestrator/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin

g.store.View(func(tx store.ReadTx) {
for _, serviceID := range serviceIDs {
service := g.globalServices[serviceID].Service
if service == nil {
continue
}

tasks, err := store.FindTasks(tx, store.ByServiceID(serviceID))
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices failed finding tasks for service %s", serviceID)
Expand All @@ -265,11 +270,22 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
nodeTasks[serviceID] = make(map[string][]*api.Task)

for _, t := range tasks {
if t.DesiredState <= api.TaskStateRunning {
// Collect all running instances of this service
nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t)
nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t)
}

// Keep all runnable instances of this service,
// and instances that were not be restarted due
// to restart policy but may be updated if the
// service spec changed.
for nodeID, slot := range nodeTasks[serviceID] {
updatable := g.restarts.UpdatableTasksInSlot(ctx, slot, g.globalServices[serviceID].Service)
if len(updatable) != 0 {
nodeTasks[serviceID][nodeID] = updatable
} else {
delete(nodeTasks[serviceID], nodeID)
}
}

}
})

Expand Down Expand Up @@ -374,11 +390,6 @@ func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
return
}

var serviceIDs []string
for id := range g.globalServices {
serviceIDs = append(serviceIDs, id)
}

node, exists := g.nodes[node.ID]
if !exists {
return
Expand All @@ -400,24 +411,31 @@ func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
return
}

for _, serviceID := range serviceIDs {
for serviceID, service := range g.globalServices {
for _, t := range tasksOnNode {
if t.ServiceID != serviceID {
continue
}
if t.DesiredState <= api.TaskStateRunning {
tasks[serviceID] = append(tasks[serviceID], t)
tasks[serviceID] = append(tasks[serviceID], t)
}

// Keep all runnable instances of this service,
// and instances that were not be restarted due
// to restart policy but may be updated if the
// service spec changed.
for serviceID, slot := range tasks {
updatable := g.restarts.UpdatableTasksInSlot(ctx, slot, service.Service)

if len(updatable) != 0 {
tasks[serviceID] = updatable
} else {
delete(tasks, serviceID)
}
}
}

err = g.store.Batch(func(batch *store.Batch) error {
for _, serviceID := range serviceIDs {
service, exists := g.globalServices[serviceID]
if !exists {
continue
}

for serviceID, service := range g.globalServices {
if !constraint.NodeMatches(service.constraints, node) {
continue
}
Expand Down Expand Up @@ -560,6 +578,14 @@ func (g *Orchestrator) IsRelatedService(service *api.Service) bool {
return orchestrator.IsGlobalService(service)
}

// SlotTuple returns a slot tuple for the global service task.
func (g *Orchestrator) SlotTuple(t *api.Task) orchestrator.SlotTuple {
return orchestrator.SlotTuple{
ServiceID: t.ServiceID,
NodeID: t.NodeID,
}
}

func isTaskCompleted(t *api.Task, restartPolicy api.RestartPolicy_RestartCondition) bool {
if t == nil || t.DesiredState <= api.TaskStateRunning {
return false
Expand Down
Loading