Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions internal/jobobject/iocp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package jobobject

import (
"context"
"fmt"
"sync"
"unsafe"

"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/queue"
"github.com/Microsoft/hcsshim/internal/winapi"
"github.com/sirupsen/logrus"
"golang.org/x/sys/windows"
)

var (
ioInitOnce sync.Once
initIOErr error
Comment thread
dcantah marked this conversation as resolved.
Comment thread
katiewasnothere marked this conversation as resolved.
// Global iocp handle that will be re-used for every job object
ioCompletionPort windows.Handle
// Mapping of job handle to queue to place notifications in.
jobMap sync.Map
)

// MsgAllProcessesExited is a type representing a message that every process in a job has exited.
type MsgAllProcessesExited struct{}

// MsgUnimplemented represents a message that we are aware of, but that isn't implemented currently.
// This should not be treated as an error.
type MsgUnimplemented struct{}
Comment thread
ambarve marked this conversation as resolved.

// pollIOCP polls the io completion port forever.
func pollIOCP(ctx context.Context, iocpHandle windows.Handle) {
var (
overlapped uintptr
code uint32
key uintptr
)

for {
err := winapi.GetQueuedCompletionStatus(iocpHandle, &code, &key, (**windows.Overlapped)(unsafe.Pointer(&overlapped)), windows.INFINITE)
if err != nil {
log.G(ctx).WithError(err).Error("failed to poll for job object message")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is some non-transient error i.e some error which won't be resolved by the time of next iteration? We will keep looping with no way of breaking out of the loop?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From asking around, I couldn't find such a case for this call so I didn't worry about it. In go-winio, we check if nothing was filled in for the overlapped struct and if not we just panic so I think this is a little more friendly at least.https://github.com/microsoft/go-winio/blob/5b44b70ab3ab4d291a7c1d28afe7b4afeced0ed4/file.go#L162

Copy link
Copy Markdown
Contributor

@ambarve ambarve Nov 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds fine. Do you think we should add a check for context Done()and break from this loop in that case?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, that's a bad idea. The ctx which was actually used during a call to this function could be cancelled but then that will stop this loop for all other jobs.

continue
}
if val, ok := jobMap.Load(key); ok {
msq, ok := val.(*queue.MessageQueue)
if !ok {
log.G(ctx).WithField("value", msq).Warn("encountered non queue type in job map")
continue
}
notification, err := parseMessage(code, overlapped)
if err != nil {
log.G(ctx).WithFields(logrus.Fields{
"code": code,
"overlapped": overlapped,
}).Warn("failed to parse job object message")
continue
}
if err := msq.Write(notification); err == queue.ErrQueueClosed {
// Write will only return an error when the queue is closed.
// The only time a queue would ever be closed is when we call `Close` on
// the job it belongs to which also removes it from the jobMap, so something
// went wrong here. We can't return as this is reading messages for all jobs
// so just log it and move on.
log.G(ctx).WithFields(logrus.Fields{
"code": code,
"overlapped": overlapped,
}).Warn("tried to write to a closed queue")
continue
}
} else {
log.G(ctx).Warn("received a message for a job not present in the mapping")
Comment thread
dcantah marked this conversation as resolved.
}
}
}

func parseMessage(code uint32, overlapped uintptr) (interface{}, error) {
// Check code and parse out relevant information related to that notification
// that we care about. For now all we handle is the message that all processes
// in the job have exited.
switch code {
case winapi.JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO:
return MsgAllProcessesExited{}, nil
// Other messages for completeness and a check to make sure that if we fall
// into the default case that this is a code we don't know how to handle.
case winapi.JOB_OBJECT_MSG_END_OF_JOB_TIME:
case winapi.JOB_OBJECT_MSG_END_OF_PROCESS_TIME:
case winapi.JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT:
case winapi.JOB_OBJECT_MSG_NEW_PROCESS:
case winapi.JOB_OBJECT_MSG_EXIT_PROCESS:
case winapi.JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS:
case winapi.JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT:
case winapi.JOB_OBJECT_MSG_JOB_MEMORY_LIMIT:
case winapi.JOB_OBJECT_MSG_NOTIFICATION_LIMIT:
default:
return nil, fmt.Errorf("unknown job notification type: %d", code)
}
return MsgUnimplemented{}, nil
}

// Assigns an IO completion port to get notified of events for the registered job
// object.
func attachIOCP(job windows.Handle, iocp windows.Handle) error {
info := winapi.JOBOBJECT_ASSOCIATE_COMPLETION_PORT{
CompletionKey: job,
CompletionPort: iocp,
}
_, err := windows.SetInformationJobObject(job, windows.JobObjectAssociateCompletionPortInformation, uintptr(unsafe.Pointer(&info)), uint32(unsafe.Sizeof(info)))
Comment thread
ambarve marked this conversation as resolved.
return err
}
Loading