Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion api/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ enum TaskState {
SHUTDOWN = 640 [(gogoproto.enumvalue_customname)="TaskStateShutdown"]; // orchestrator requested shutdown
FAILED = 704 [(gogoproto.enumvalue_customname)="TaskStateFailed"]; // task execution failed with error
REJECTED = 768 [(gogoproto.enumvalue_customname)="TaskStateRejected"]; // task could not be executed here.
ORPHANED = 832 [(gogoproto.enumvalue_customname)="TaskStateOrphaned"]; // The node on which this task is scheduled is Down for too long
// The main purpose of this state is to free up resources associated with service tasks on
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Go style for these comments: start with the identifier.

// TaskStateOrphaned ...

// unresponsive nodes without having to delete those tasks. This state is directly assigned
// to the task by the orchestrator.
ORPHANED = 832 [(gogoproto.enumvalue_customname)="TaskStateOrphaned"];

// NOTE(stevvooe): The state of a task is actually a lamport clock, in that
// given two observations, the greater of the two can be considered
Expand Down
43 changes: 35 additions & 8 deletions manager/orchestrator/taskreaper/task_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ const (
// exist for the same service/instance or service/nodeid combination.
type TaskReaper struct {
store *store.MemoryStore

// taskHistory is the number of tasks to keep
taskHistory int64
dirty map[orchestrator.SlotTuple]struct{}
orphaned []string
stopChan chan struct{}
doneChan chan struct{}

// List of slot tubles to be inspected for task history cleanup.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, make sure to follow godoc style for these: start with the identifier.

dirty map[orchestrator.SlotTuple]struct{}

// List of serviceless orphaned tasks collected for cleanup.
orphaned []string
stopChan chan struct{}
doneChan chan struct{}
}

// New creates a new TaskReaper.
Expand All @@ -41,7 +46,11 @@ func New(store *store.MemoryStore) *TaskReaper {
}
}

// Run is the TaskReaper's main loop.
// Run is the TaskReaper's watch loop which collects candidates for cleanup.
// Task history is mainly used in task restarts but is also available for administrative purposes.
// Note that the task history is stored per-slot-per-service for replicated services
// and per-node-per-service for global services. History does not apply to serviceless
// since they are not attached to a service.
func (tr *TaskReaper) Run(ctx context.Context) {
watcher, watchCancel := state.Watch(tr.store.WatchQueue(), api.EventCreateTask{}, api.EventUpdateTask{}, api.EventUpdateCluster{})

Expand All @@ -59,6 +68,7 @@ func (tr *TaskReaper) Run(ctx context.Context) {
tr.taskHistory = clusters[0].Spec.Orchestration.TaskHistoryRetentionLimit
}

// On startup, scan the entire store and inspect orphaned tasks from previous life.
tasks, err = store.FindTasks(readTx, store.ByTaskState(api.TaskStateOrphaned))
if err != nil {
log.G(ctx).WithError(err).Error("failed to find Orphaned tasks in task reaper init")
Expand All @@ -67,21 +77,31 @@ func (tr *TaskReaper) Run(ctx context.Context) {

if len(tasks) > 0 {
for _, t := range tasks {
// Do not reap service tasks immediately
// Do not reap service tasks immediately.
// Let them go through the regular history cleanup process
// of checking TaskHistoryRetentionLimit.
if t.ServiceID != "" {
continue
}

// Serviceless tasks can be cleaned up right away since they are not attached to a service.
tr.orphaned = append(tr.orphaned, t.ID)
}

// Clean up orphaned serviceless tasks right away.
if len(tr.orphaned) > 0 {
tr.tick()
}
}

// Clean up when we hit TaskHistoryRetentionLimit or when the timer expires,
// whichever happens first.
timer := time.NewTimer(reaperBatchingInterval)

// Watch for:
// 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot.
// 2. EventUpdateTask for cleaning serviceless orphaned tasks (when orchestrator updates the task status to ORPHANED).
// 3. EventUpdateCluster for TaskHistoryRetentionLimit update.
for {
select {
case event := <-watcher:
Expand Down Expand Up @@ -118,6 +138,7 @@ func (tr *TaskReaper) Run(ctx context.Context) {
}
}

// tick performs task history cleanup.
func (tr *TaskReaper) tick() {
if len(tr.dirty) == 0 && len(tr.orphaned) == 0 {
return
Expand All @@ -131,6 +152,8 @@ func (tr *TaskReaper) tick() {
for _, tID := range tr.orphaned {
deleteTasks[tID] = struct{}{}
}

// Check history of dirty tasks for cleanup.
tr.store.View(func(tx store.ReadTx) {
for dirty := range tr.dirty {
service := store.GetService(tx, dirty.ServiceID)
Expand All @@ -141,8 +164,8 @@ func (tr *TaskReaper) tick() {
taskHistory := tr.taskHistory

// If MaxAttempts is set, keep at least one more than
// that number of tasks. This is necessary reconstruct
// restart history when the orchestrator starts up.
// that number of tasks (this overrides TaskHistoryRetentionLimit).
// This is necessary to reconstruct restart history when the orchestrator starts up.
// TODO(aaronl): Consider hiding tasks beyond the normal
// retention limit in the UI.
// TODO(aaronl): There are some ways to cut down the
Expand All @@ -156,6 +179,7 @@ func (tr *TaskReaper) tick() {
taskHistory = int64(service.Spec.Task.Restart.MaxAttempts) + 1
}

// Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history.
if taskHistory < 0 {
continue
}
Expand All @@ -164,13 +188,15 @@ func (tr *TaskReaper) tick() {

switch service.Spec.GetMode().(type) {
case *api.ServiceSpec_Replicated:
// Clean out the slot for which we received EventCreateTask.
var err error
historicTasks, err = store.FindTasks(tx, store.BySlot(dirty.ServiceID, dirty.Slot))
if err != nil {
continue
}

case *api.ServiceSpec_Global:
// Clean out the node history in case of global services.
tasksByNode, err := store.FindTasks(tx, store.ByNodeID(dirty.NodeID))
if err != nil {
continue
Expand Down Expand Up @@ -215,6 +241,7 @@ func (tr *TaskReaper) tick() {
}
})

// Perform cleanup.
if len(deleteTasks) > 0 {
tr.store.Batch(func(batch *store.Batch) error {
for taskID := range deleteTasks {
Expand Down