Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 179 additions & 9 deletions internal/hcs/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"syscall"
"time"

"github.com/Microsoft/hcsshim/internal/cow"
"github.com/Microsoft/hcsshim/internal/hcs/schema1"
hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2"
"github.com/Microsoft/hcsshim/internal/jobobject"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/logfields"
"github.com/Microsoft/hcsshim/internal/oc"
"github.com/Microsoft/hcsshim/internal/timeout"
"github.com/Microsoft/hcsshim/internal/vmcompute"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

Expand All @@ -30,7 +35,8 @@ type System struct {
waitBlock chan struct{}
waitError error
exitError error
os, typ string
os, typ, owner string
startTime time.Time
}

func newSystem(id string) *System {
Expand All @@ -40,6 +46,11 @@ func newSystem(id string) *System {
}
}

// Implementation detail for silo naming, this should NOT be relied upon very heavily.
func siloNameFmt(containerID string) string {
return fmt.Sprintf(`\Container_%s`, containerID)
}

// CreateComputeSystem creates a new compute system with the given configuration but does not start it.
func CreateComputeSystem(ctx context.Context, id string, hcsDocumentInterface interface{}) (_ *System, err error) {
operation := "hcs::CreateComputeSystem"
Expand Down Expand Up @@ -129,6 +140,7 @@ func (computeSystem *System) getCachedProperties(ctx context.Context) error {
}
computeSystem.typ = strings.ToLower(props.SystemType)
computeSystem.os = strings.ToLower(props.RuntimeOSType)
computeSystem.owner = strings.ToLower(props.Owner)
if computeSystem.os == "" && computeSystem.typ == "container" {
// Pre-RS5 HCS did not return the OS, but it only supported containers
// that ran Windows.
Expand Down Expand Up @@ -197,7 +209,7 @@ func (computeSystem *System) Start(ctx context.Context) (err error) {
if err != nil {
return makeSystemError(computeSystem, operation, err, events)
}

computeSystem.startTime = time.Now()
return nil
}

Expand Down Expand Up @@ -326,11 +338,115 @@ func (computeSystem *System) Properties(ctx context.Context, types ...schema1.Pr
return properties, nil
}

// PropertiesV2 returns the requested container properties targeting a V2 schema container.
func (computeSystem *System) PropertiesV2(ctx context.Context, types ...hcsschema.PropertyType) (*hcsschema.Properties, error) {
computeSystem.handleLock.RLock()
defer computeSystem.handleLock.RUnlock()
// queryInProc handles querying for container properties without reaching out to HCS. `props`
// will be updated to contain any data returned from the queries present in `types`. If any properties
// failed to be queried they will be tallied up and returned in as the first return value. Failures on
// query are NOT considered errors; the only failure case for this method is if the containers job object
// cannot be opened.
func (computeSystem *System) queryInProc(ctx context.Context, props *hcsschema.Properties, types []hcsschema.PropertyType) ([]hcsschema.PropertyType, error) {
// In the future we can make use of some new functionality in the HCS that allows you
// to pass a job object for HCS to use for the container. Currently, the only way we'll
// be able to open the job/silo is if we're running as SYSTEM.
jobOptions := &jobobject.Options{
Comment thread
helsaawy marked this conversation as resolved.
UseNTVariant: true,
Name: siloNameFmt(computeSystem.id),
Comment thread
dcantah marked this conversation as resolved.
}
job, err := jobobject.Open(ctx, jobOptions)
if err != nil {
return nil, err
}
defer job.Close()

var fallbackQueryTypes []hcsschema.PropertyType
for _, propType := range types {
switch propType {
case hcsschema.PTStatistics:
// Handle a bad caller asking for the same type twice. No use in re-querying if this is
// filled in already.
if props.Statistics == nil {
props.Statistics, err = computeSystem.statisticsInProc(job)
if err != nil {
log.G(ctx).WithError(err).Warn("failed to get statistics in-proc")

fallbackQueryTypes = append(fallbackQueryTypes, propType)
Comment thread
kevpar marked this conversation as resolved.
}
}
default:
fallbackQueryTypes = append(fallbackQueryTypes, propType)
}
}

return fallbackQueryTypes, nil
}

// statisticsInProc emulates what HCS does to grab statistics for a given container with a small
// change to make grabbing the private working set total much more efficient.
func (computeSystem *System) statisticsInProc(job *jobobject.JobObject) (*hcsschema.Statistics, error) {
// Start timestamp for these stats before we grab them to match HCS
timestamp := time.Now()

memInfo, err := job.QueryMemoryStats()
if err != nil {
return nil, err
}

processorInfo, err := job.QueryProcessorStats()
if err != nil {
return nil, err
}

storageInfo, err := job.QueryStorageStats()
if err != nil {
return nil, err
}

// This calculates the private working set more efficiently than HCS does. HCS calls NtQuerySystemInformation
// with the class SystemProcessInformation which returns an array containing system information for *every*
// process running on the machine. They then grab the pids that are running in the container and filter down
// the entries in the array to only what's running in that silo and start tallying up the total. This doesn't
// work well as performance should get worse if more processess are running on the machine in general and not
// just in the container. All of the additional information besides the WorkingSetPrivateSize field is ignored
// as well which isn't great and is wasted work to fetch.
//
// HCS only let's you grab statistics in an all or nothing fashion, so we can't just grab the private
// working set ourselves and ask for everything else seperately. The optimization we can make here is
// to open the silo ourselves and do the same queries for the rest of the info, as well as calculating
// the private working set in a more efficient manner by:
//
// 1. Find the pids running in the silo
// 2. Get a process handle for every process (only need PROCESS_QUERY_LIMITED_INFORMATION access)
// 3. Call NtQueryInformationProcess on each process with the class ProcessVmCounters
// 4. Tally up the total using the field PrivateWorkingSetSize in VM_COUNTERS_EX2.
privateWorkingSet, err := job.QueryPrivateWorkingSet()
if err != nil {
return nil, err
}

return &hcsschema.Statistics{
Timestamp: timestamp,
ContainerStartTime: computeSystem.startTime,
Uptime100ns: uint64(time.Since(computeSystem.startTime).Nanoseconds()) / 100,
Memory: &hcsschema.MemoryStats{
MemoryUsageCommitBytes: memInfo.JobMemory,
MemoryUsageCommitPeakBytes: memInfo.PeakJobMemoryUsed,
MemoryUsagePrivateWorkingSetBytes: privateWorkingSet,
},
Processor: &hcsschema.ProcessorStats{
RuntimeKernel100ns: uint64(processorInfo.TotalKernelTime),
RuntimeUser100ns: uint64(processorInfo.TotalUserTime),
TotalRuntime100ns: uint64(processorInfo.TotalKernelTime + processorInfo.TotalUserTime),
},
Storage: &hcsschema.StorageStats{
ReadCountNormalized: uint64(storageInfo.ReadStats.IoCount),
ReadSizeBytes: storageInfo.ReadStats.TotalSize,
WriteCountNormalized: uint64(storageInfo.WriteStats.IoCount),
WriteSizeBytes: storageInfo.WriteStats.TotalSize,
},
}, nil
}

// hcsPropertiesV2Query is a helper to make a HcsGetComputeSystemProperties call using the V2 schema property types.
func (computeSystem *System) hcsPropertiesV2Query(ctx context.Context, types []hcsschema.PropertyType) (*hcsschema.Properties, error) {
operation := "hcs::System::PropertiesV2"

queryBytes, err := json.Marshal(hcsschema.PropertyQuery{PropertyTypes: types})
Expand All @@ -347,12 +463,66 @@ func (computeSystem *System) PropertiesV2(ctx context.Context, types ...hcsschem
if propertiesJSON == "" {
return nil, ErrUnexpectedValue
}
properties := &hcsschema.Properties{}
if err := json.Unmarshal([]byte(propertiesJSON), properties); err != nil {
props := &hcsschema.Properties{}
if err := json.Unmarshal([]byte(propertiesJSON), props); err != nil {
return nil, makeSystemError(computeSystem, operation, err, nil)
}

return properties, nil
return props, nil
}

// PropertiesV2 returns the requested compute systems properties targeting a V2 schema compute system.
func (computeSystem *System) PropertiesV2(ctx context.Context, types ...hcsschema.PropertyType) (_ *hcsschema.Properties, err error) {
computeSystem.handleLock.RLock()
defer computeSystem.handleLock.RUnlock()

// Let HCS tally up the total for VM based queries instead of querying ourselves.
if computeSystem.typ != "container" {
return computeSystem.hcsPropertiesV2Query(ctx, types)
}

// Define a starter Properties struct with the default fields returned from every
// query. Owner is only returned from Statistics but it's harmless to include.
properties := &hcsschema.Properties{
Id: computeSystem.id,
SystemType: computeSystem.typ,
RuntimeOsType: computeSystem.os,
Owner: computeSystem.owner,
}

logEntry := log.G(ctx)
// First lets try and query ourselves without reaching to HCS. If any of the queries fail
// we'll take note and fallback to querying HCS for any of the failed types.
fallbackTypes, err := computeSystem.queryInProc(ctx, properties, types)
if err == nil && len(fallbackTypes) == 0 {
return properties, nil
} else if err != nil {
logEntry.WithError(fmt.Errorf("failed to query compute system properties in-proc: %w", err))
fallbackTypes = types
}

logEntry.WithFields(logrus.Fields{
logfields.ContainerID: computeSystem.id,
"propertyTypes": fallbackTypes,
}).Info("falling back to HCS for property type queries")

hcsProperties, err := computeSystem.hcsPropertiesV2Query(ctx, fallbackTypes)
if err != nil {
return nil, err
}

// Now add in anything that we might have successfully queried in process.
if properties.Statistics != nil {
hcsProperties.Statistics = properties.Statistics
hcsProperties.Owner = properties.Owner
}

// For future support for querying processlist in-proc as well.
if properties.ProcessList != nil {
Comment thread
kevpar marked this conversation as resolved.
hcsProperties.ProcessList = properties.ProcessList
}

return hcsProperties, nil
}

// Pause pauses the execution of the computeSystem. This feature is not enabled in TP5.
Expand Down
22 changes: 11 additions & 11 deletions internal/jobcontainers/jobcontainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,9 @@ func (c *JobContainer) PropertiesV2(ctx context.Context, types ...hcsschema.Prop
return nil, errors.New("PTStatistics is the only supported property type for job containers")
}

// Start timestamp before we grab the stats to match HCS' behavior
timestamp := time.Now()

memInfo, err := c.job.QueryMemoryStats()
if err != nil {
return nil, errors.Wrap(err, "failed to query for job containers memory information")
Expand All @@ -435,18 +438,15 @@ func (c *JobContainer) PropertiesV2(ctx context.Context, types ...hcsschema.Prop
return nil, errors.Wrap(err, "failed to query for job containers storage information")
}

var privateWorkingSet uint64
err = forEachProcessInfo(c.job, func(procInfo *winapi.SYSTEM_PROCESS_INFORMATION) {
privateWorkingSet += uint64(procInfo.WorkingSetPrivateSize)
})
privateWorkingSet, err := c.job.QueryPrivateWorkingSet()
if err != nil {
return nil, errors.Wrap(err, "failed to get private working set for container")
return nil, fmt.Errorf("failed to get private working set for container: %w", err)
}

return &hcsschema.Properties{
Statistics: &hcsschema.Statistics{
Timestamp: time.Now(),
Uptime100ns: uint64(time.Since(c.startTimestamp)) / 100,
Timestamp: timestamp,
Uptime100ns: uint64(time.Since(c.startTimestamp).Nanoseconds()) / 100,
ContainerStartTime: c.startTimestamp,
Memory: &hcsschema.MemoryStats{
MemoryUsageCommitBytes: memInfo.JobMemory,
Expand All @@ -459,10 +459,10 @@ func (c *JobContainer) PropertiesV2(ctx context.Context, types ...hcsschema.Prop
TotalRuntime100ns: uint64(processorInfo.TotalKernelTime + processorInfo.TotalUserTime),
},
Storage: &hcsschema.StorageStats{
ReadCountNormalized: storageInfo.IoInfo.ReadOperationCount,
ReadSizeBytes: storageInfo.IoInfo.ReadTransferCount,
WriteCountNormalized: storageInfo.IoInfo.WriteOperationCount,
WriteSizeBytes: storageInfo.IoInfo.WriteTransferCount,
ReadCountNormalized: uint64(storageInfo.ReadStats.IoCount),
ReadSizeBytes: storageInfo.ReadStats.TotalSize,
WriteCountNormalized: uint64(storageInfo.WriteStats.IoCount),
WriteSizeBytes: storageInfo.WriteStats.TotalSize,
},
},
}, nil
Expand Down
67 changes: 64 additions & 3 deletions internal/jobobject/jobobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,18 +430,20 @@ func (job *JobObject) QueryProcessorStats() (*winapi.JOBOBJECT_BASIC_ACCOUNTING_
}

// QueryStorageStats gets the storage (I/O) stats for the job object.
func (job *JobObject) QueryStorageStats() (*winapi.JOBOBJECT_BASIC_AND_IO_ACCOUNTING_INFORMATION, error) {
func (job *JobObject) QueryStorageStats() (*winapi.JOBOBJECT_IO_ATTRIBUTION_INFORMATION, error) {
job.handleLock.RLock()
Comment thread
dcantah marked this conversation as resolved.
defer job.handleLock.RUnlock()

if job.handle == 0 {
return nil, ErrAlreadyClosed
}

info := winapi.JOBOBJECT_BASIC_AND_IO_ACCOUNTING_INFORMATION{}
info := winapi.JOBOBJECT_IO_ATTRIBUTION_INFORMATION{
ControlFlags: winapi.JOBOBJECT_IO_ATTRIBUTION_CONTROL_ENABLE,
}
if err := winapi.QueryInformationJobObject(
job.handle,
winapi.JobObjectBasicAndIoAccountingInformation,
winapi.JobObjectIoAttribution,
uintptr(unsafe.Pointer(&info)),
uint32(unsafe.Sizeof(info)),
nil,
Expand Down Expand Up @@ -546,3 +548,62 @@ func (job *JobObject) PromoteToSilo() error {
func (job *JobObject) isSilo() bool {
return atomic.LoadUint32(&job.isAppSilo) == 1
}

// QueryPrivateWorkingSet returns the private working set size for the job. This is calculated by adding up the
// private working set for every process running in the job.
func (job *JobObject) QueryPrivateWorkingSet() (uint64, error) {
pids, err := job.Pids()
if err != nil {
return 0, err
}

openAndQueryWorkingSet := func(pid uint32) (uint64, error) {
h, err := windows.OpenProcess(windows.PROCESS_QUERY_LIMITED_INFORMATION, false, pid)
if err != nil {
// Continue to the next if OpenProcess doesn't return a valid handle (fails). Handles a
// case where one of the pids in the job exited before we open.
return 0, nil
}
defer func() {
_ = windows.Close(h)
}()
// Check if the process is actually running in the job still. There's a small chance
// that the process could have exited and had its pid re-used between grabbing the pids
// in the job and opening the handle to it above.
var inJob int32
if err := winapi.IsProcessInJob(h, job.handle, &inJob); err != nil {
// This shouldn't fail unless we have incorrect access rights which we control
// here so probably best to error out if this failed.
return 0, err
}
// Don't report stats for this process as it's not running in the job. This shouldn't be
// an error condition though.
if inJob == 0 {
return 0, nil
}

var vmCounters winapi.VM_COUNTERS_EX2
status := winapi.NtQueryInformationProcess(
h,
winapi.ProcessVmCounters,
uintptr(unsafe.Pointer(&vmCounters)),
uint32(unsafe.Sizeof(vmCounters)),
nil,
)
if !winapi.NTSuccess(status) {
return 0, fmt.Errorf("failed to query information for process: %w", winapi.RtlNtStatusToDosError(status))
}
return uint64(vmCounters.PrivateWorkingSetSize), nil
}

var jobWorkingSetSize uint64
for _, pid := range pids {
Comment thread
dcantah marked this conversation as resolved.
workingSet, err := openAndQueryWorkingSet(pid)
if err != nil {
return 0, err
}
jobWorkingSetSize += workingSet
}

return jobWorkingSetSize, nil
}
Loading