From e08257f90e45bdb37355c13cb994948f9c6cca65 Mon Sep 17 00:00:00 2001 From: Anshul Pundir Date: Mon, 30 Oct 2017 18:13:12 -0700 Subject: [PATCH] Comments for orphaned state/task reaper. Signed-off-by: Anshul Pundir --- api/types.pb.go | 5 ++- api/types.proto | 5 ++- .../orchestrator/taskreaper/task_reaper.go | 43 +++++++++++++++---- 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/api/types.pb.go b/api/types.pb.go index 33e2281b67..5b9b719ce1 100644 --- a/api/types.pb.go +++ b/api/types.pb.go @@ -72,7 +72,10 @@ const ( TaskStateShutdown TaskState = 640 TaskStateFailed TaskState = 704 TaskStateRejected TaskState = 768 - TaskStateOrphaned TaskState = 832 + // The main purpose of this state is to free up resources associated with service tasks on + // unresponsive nodes without having to delete those tasks. This state is directly assigned + // to the task by the orchestrator. + TaskStateOrphaned TaskState = 832 ) var TaskState_name = map[int32]string{ diff --git a/api/types.proto b/api/types.proto index 635d12b200..8906b9622f 100644 --- a/api/types.proto +++ b/api/types.proto @@ -458,7 +458,10 @@ enum TaskState { SHUTDOWN = 640 [(gogoproto.enumvalue_customname)="TaskStateShutdown"]; // orchestrator requested shutdown FAILED = 704 [(gogoproto.enumvalue_customname)="TaskStateFailed"]; // task execution failed with error REJECTED = 768 [(gogoproto.enumvalue_customname)="TaskStateRejected"]; // task could not be executed here. - ORPHANED = 832 [(gogoproto.enumvalue_customname)="TaskStateOrphaned"]; // The node on which this task is scheduled is Down for too long + // The main purpose of this state is to free up resources associated with service tasks on + // unresponsive nodes without having to delete those tasks. This state is directly assigned + // to the task by the orchestrator. + ORPHANED = 832 [(gogoproto.enumvalue_customname)="TaskStateOrphaned"]; // NOTE(stevvooe): The state of a task is actually a lamport clock, in that // given two observations, the greater of the two can be considered diff --git a/manager/orchestrator/taskreaper/task_reaper.go b/manager/orchestrator/taskreaper/task_reaper.go index 577319c8e8..e82208f476 100644 --- a/manager/orchestrator/taskreaper/task_reaper.go +++ b/manager/orchestrator/taskreaper/task_reaper.go @@ -23,12 +23,17 @@ const ( // exist for the same service/instance or service/nodeid combination. type TaskReaper struct { store *store.MemoryStore + // taskHistory is the number of tasks to keep taskHistory int64 - dirty map[orchestrator.SlotTuple]struct{} - orphaned []string - stopChan chan struct{} - doneChan chan struct{} + + // List of slot tubles to be inspected for task history cleanup. + dirty map[orchestrator.SlotTuple]struct{} + + // List of serviceless orphaned tasks collected for cleanup. + orphaned []string + stopChan chan struct{} + doneChan chan struct{} } // New creates a new TaskReaper. @@ -41,7 +46,11 @@ func New(store *store.MemoryStore) *TaskReaper { } } -// Run is the TaskReaper's main loop. +// Run is the TaskReaper's watch loop which collects candidates for cleanup. +// Task history is mainly used in task restarts but is also available for administrative purposes. +// Note that the task history is stored per-slot-per-service for replicated services +// and per-node-per-service for global services. History does not apply to serviceless +// since they are not attached to a service. func (tr *TaskReaper) Run(ctx context.Context) { watcher, watchCancel := state.Watch(tr.store.WatchQueue(), api.EventCreateTask{}, api.EventUpdateTask{}, api.EventUpdateCluster{}) @@ -59,6 +68,7 @@ func (tr *TaskReaper) Run(ctx context.Context) { tr.taskHistory = clusters[0].Spec.Orchestration.TaskHistoryRetentionLimit } + // On startup, scan the entire store and inspect orphaned tasks from previous life. tasks, err = store.FindTasks(readTx, store.ByTaskState(api.TaskStateOrphaned)) if err != nil { log.G(ctx).WithError(err).Error("failed to find Orphaned tasks in task reaper init") @@ -67,21 +77,31 @@ func (tr *TaskReaper) Run(ctx context.Context) { if len(tasks) > 0 { for _, t := range tasks { - // Do not reap service tasks immediately + // Do not reap service tasks immediately. + // Let them go through the regular history cleanup process + // of checking TaskHistoryRetentionLimit. if t.ServiceID != "" { continue } + // Serviceless tasks can be cleaned up right away since they are not attached to a service. tr.orphaned = append(tr.orphaned, t.ID) } + // Clean up orphaned serviceless tasks right away. if len(tr.orphaned) > 0 { tr.tick() } } + // Clean up when we hit TaskHistoryRetentionLimit or when the timer expires, + // whichever happens first. timer := time.NewTimer(reaperBatchingInterval) + // Watch for: + // 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot. + // 2. EventUpdateTask for cleaning serviceless orphaned tasks (when orchestrator updates the task status to ORPHANED). + // 3. EventUpdateCluster for TaskHistoryRetentionLimit update. for { select { case event := <-watcher: @@ -118,6 +138,7 @@ func (tr *TaskReaper) Run(ctx context.Context) { } } +// tick performs task history cleanup. func (tr *TaskReaper) tick() { if len(tr.dirty) == 0 && len(tr.orphaned) == 0 { return @@ -131,6 +152,8 @@ func (tr *TaskReaper) tick() { for _, tID := range tr.orphaned { deleteTasks[tID] = struct{}{} } + + // Check history of dirty tasks for cleanup. tr.store.View(func(tx store.ReadTx) { for dirty := range tr.dirty { service := store.GetService(tx, dirty.ServiceID) @@ -141,8 +164,8 @@ func (tr *TaskReaper) tick() { taskHistory := tr.taskHistory // If MaxAttempts is set, keep at least one more than - // that number of tasks. This is necessary reconstruct - // restart history when the orchestrator starts up. + // that number of tasks (this overrides TaskHistoryRetentionLimit). + // This is necessary to reconstruct restart history when the orchestrator starts up. // TODO(aaronl): Consider hiding tasks beyond the normal // retention limit in the UI. // TODO(aaronl): There are some ways to cut down the @@ -156,6 +179,7 @@ func (tr *TaskReaper) tick() { taskHistory = int64(service.Spec.Task.Restart.MaxAttempts) + 1 } + // Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history. if taskHistory < 0 { continue } @@ -164,6 +188,7 @@ func (tr *TaskReaper) tick() { switch service.Spec.GetMode().(type) { case *api.ServiceSpec_Replicated: + // Clean out the slot for which we received EventCreateTask. var err error historicTasks, err = store.FindTasks(tx, store.BySlot(dirty.ServiceID, dirty.Slot)) if err != nil { @@ -171,6 +196,7 @@ func (tr *TaskReaper) tick() { } case *api.ServiceSpec_Global: + // Clean out the node history in case of global services. tasksByNode, err := store.FindTasks(tx, store.ByNodeID(dirty.NodeID)) if err != nil { continue @@ -215,6 +241,7 @@ func (tr *TaskReaper) tick() { } }) + // Perform cleanup. if len(deleteTasks) > 0 { tr.store.Batch(func(batch *store.Batch) error { for taskID := range deleteTasks {