diff --git a/pkg/kubelet/cadvisor/helpers_linux.go b/pkg/kubelet/cadvisor/helpers_linux.go index ee6889b9d571a..3408aa90ae5eb 100644 --- a/pkg/kubelet/cadvisor/helpers_linux.go +++ b/pkg/kubelet/cadvisor/helpers_linux.go @@ -39,13 +39,6 @@ func (i *imageFsInfoProvider) ImageFsInfoLabel() (string, error) { switch i.runtime { case types.DockerContainerRuntime: return cadvisorfs.LabelDockerImages, nil - case types.RemoteContainerRuntime: - // This is a temporary workaround to get stats for cri-o from cadvisor - // and should be removed. - // Related to https://github.com/kubernetes/kubernetes/issues/51798 - if i.runtimeEndpoint == CrioSocket || i.runtimeEndpoint == "unix://"+CrioSocket { - return cadvisorfs.LabelCrioImages, nil - } } return "", fmt.Errorf("no imagefs label for configured runtime") } diff --git a/pkg/kubelet/cadvisor/util.go b/pkg/kubelet/cadvisor/util.go index 6020abd4dee1a..d53a2cb71b354 100644 --- a/pkg/kubelet/cadvisor/util.go +++ b/pkg/kubelet/cadvisor/util.go @@ -21,20 +21,12 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi2 "github.com/google/cadvisor/info/v2" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) -const ( - // CrioSocket is the path to the CRI-O socket. - // Please keep this in sync with the one in: - // github.com/google/cadvisor/tree/master/container/crio/client.go - CrioSocket = "/var/run/crio/crio.sock" -) - -// CapacityFromMachineInfo returns the capacity of the resources from the machine info. func CapacityFromMachineInfo(info *cadvisorapi.MachineInfo) v1.ResourceList { c := v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity( @@ -74,6 +66,5 @@ func EphemeralStorageCapacityFromFsInfo(info cadvisorapi2.FsInfo) v1.ResourceLis // be removed. Related issue: // https://github.com/kubernetes/kubernetes/issues/51798 func UsingLegacyCadvisorStats(runtime, runtimeEndpoint string) bool { - return (runtime == kubetypes.DockerContainerRuntime && goruntime.GOOS == "linux") || - runtimeEndpoint == CrioSocket || runtimeEndpoint == "unix://"+CrioSocket + return (runtime == kubetypes.DockerContainerRuntime && goruntime.GOOS == "linux") } diff --git a/pkg/kubelet/cadvisor/util_test.go b/pkg/kubelet/cadvisor/util_test.go index 2fa09e54f2fbb..5d6df709d1404 100644 --- a/pkg/kubelet/cadvisor/util_test.go +++ b/pkg/kubelet/cadvisor/util_test.go @@ -23,9 +23,7 @@ import ( "reflect" "testing" - "github.com/google/cadvisor/container/crio" info "github.com/google/cadvisor/info/v1" - "github.com/stretchr/testify/assert" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" ) @@ -52,7 +50,3 @@ func TestCapacityFromMachineInfoWithHugePagesEnable(t *testing.T) { t.Errorf("when set hugepages true, got resource list %v, want %v", actual, expected) } } - -func TestCrioSocket(t *testing.T) { - assert.EqualValues(t, CrioSocket, crio.CrioSocket, "CrioSocket in this package must equal the one in github.com/google/cadvisor/container/crio/client.go") -} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ae22fcb0294dc..4fa1026646bf1 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -2336,10 +2336,19 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { // to the pod manager. kl.podManager.UpdatePod(pod) + sidecarsStatus := status.GetSidecarsStatus(pod) + // Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation. if status.NeedToReconcilePodReadiness(pod) { mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + } else if sidecarsStatus.ContainersWaiting { + // if containers aren't running and the sidecars are all ready trigger a sync so that the containers get started + if sidecarsStatus.SidecarsPresent && sidecarsStatus.SidecarsReady { + klog.Infof("Pod: %s: sidecars: sidecars are ready, dispatching work", klog.KObj(pod)) + mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) + kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + } } // After an evicted pod is synced, all dead containers in the pod can be removed. diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 9c762ac309e3b..0302ef1755ae9 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -284,9 +284,9 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String()) m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg) - if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil { - klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod), - "podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String()) + if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, 0); err != nil { + klog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v", + container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err) } return msg, ErrPostStartHook } @@ -631,6 +631,12 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(containerID l := getContainerInfoFromLabels(s.Labels) a := getContainerInfoFromAnnotations(s.Annotations) + + annotations := make(map[string]string) + if a.Sidecar { + annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", l.ContainerName)] = "Sidecar" + } + // Notice that the followings are not full spec. The container killing code should not use // un-restored fields. pod = &v1.Pod{ @@ -639,6 +645,7 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(containerID Name: l.PodName, Namespace: l.PodNamespace, DeletionGracePeriodSeconds: a.PodDeletionGracePeriod, + Annotations: annotations, }, Spec: v1.PodSpec{ TerminationGracePeriodSeconds: a.PodTerminationGracePeriod, @@ -660,12 +667,19 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(containerID // killContainer kills a container through the following steps: // * Run the pre-stop lifecycle hooks (if applicable). // * Stop the container. -func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodOverride *int64) error { +func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodDuration time.Duration) error { var containerSpec *v1.Container if pod != nil { if containerSpec = kubecontainer.GetContainerSpec(pod, containerName); containerSpec == nil { - return fmt.Errorf("failed to get containerSpec %q (id=%q) in pod %q when killing container for reason %q", - containerName, containerID.String(), format.Pod(pod), message) + // after a kubelet restart, it's not 100% certain that the + // pod we're given has the container we need in the spec + // -- we try to recover that here. + restoredPod, restoredContainer, err := m.restoreSpecsFromContainerLabels(containerID) + if err != nil { + return fmt.Errorf("failed to get containerSpec %q(id=%q) in pod %q when killing container for reason %q. error: %v", + containerName, containerID.String(), format.Pod(pod), message, err) + } + pod, containerSpec = restoredPod, restoredContainer } } else { // Restore necessary information if one of the specs is nil. @@ -714,10 +728,9 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubec if gracePeriod < minimumGracePeriodInSeconds { gracePeriod = minimumGracePeriodInSeconds } - if gracePeriodOverride != nil { - gracePeriod = *gracePeriodOverride - klog.V(3).InfoS("Killing container with a grace period override", "pod", klog.KObj(pod), "podUID", pod.UID, - "containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod) + if gracePeriodDuration > 0 { + gracePeriod = int64(gracePeriodDuration.Seconds()) + klog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod) } klog.V(2).InfoS("Killing container with a grace period", "pod", klog.KObj(pod), "podUID", pod.UID, @@ -736,18 +749,54 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubec } // killContainersWithSyncResult kills all pod's containers with sync results. -func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) { +func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodDuration time.Duration) (syncResults []*kubecontainer.SyncResult) { + // split out sidecars and non-sidecars + var ( + sidecars []*kubecontainer.Container + nonSidecars []*kubecontainer.Container + ) + for _, container := range runningPod.Containers { + if isSidecar(pod, container.Name) { + sidecars = append(sidecars, container) + } else { + nonSidecars = append(nonSidecars, container) + } + } + containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers)) - wg := sync.WaitGroup{} + // non-sidecars first + start := time.Now() + klog.Infof("Pod: %s, killContainersWithSyncResult: killing %d non-sidecars, %s termination period", runningPod.Name, len(nonSidecars), gracePeriodDuration) + nonSidecarsWg := sync.WaitGroup{} + nonSidecarsWg.Add(len(nonSidecars)) + for _, container := range nonSidecars { + go func(container *kubecontainer.Container) { + defer utilruntime.HandleCrash() + defer nonSidecarsWg.Done() + killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name) + if err := m.killContainer(pod, container.ID, container.Name, "Need to kill Pod", reasonUnknown, gracePeriodDuration); err != nil { + killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) + } + containerResults <- killContainerResult + }(container) + } + nonSidecarsWg.Wait() - wg.Add(len(runningPod.Containers)) - for _, container := range runningPod.Containers { + gracePeriodDuration = gracePeriodDuration - time.Since(start) + if gracePeriodDuration < 0 { + gracePeriodDuration = 0 + } + + // then sidecars + klog.Infof("Pod: %s, killContainersWithSyncResult: killing %d sidecars, %s left", runningPod.Name, len(sidecars), gracePeriodDuration) + wg := sync.WaitGroup{} + wg.Add(len(sidecars)) + for _, container := range sidecars { go func(container *kubecontainer.Container) { defer utilruntime.HandleCrash() defer wg.Done() - killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name) - if err := m.killContainer(pod, container.ID, container.Name, "", reasonUnknown, gracePeriodOverride); err != nil { + if err := m.killContainer(pod, container.ID, container.Name, "Need to kill Pod", reasonUnknown, gracePeriodDuration); err != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) // Use runningPod for logging as the pod passed in could be *nil*. klog.ErrorS(err, "Kill container failed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID, @@ -757,6 +806,7 @@ func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, ru }(container) } wg.Wait() + close(containerResults) for containerResult := range containerResults { diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container_test.go b/pkg/kubelet/kuberuntime/kuberuntime_container_test.go index 91d730976113d..173583897ba53 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -122,7 +122,7 @@ func TestKillContainer(t *testing.T) { } for _, test := range tests { - err := m.killContainer(test.pod, test.containerID, test.containerName, test.reason, "", &test.gracePeriodOverride) + err := m.killContainer(test.pod, test.containerID, test.containerName, test.reason, "", time.Duration(test.gracePeriodOverride)*time.Second) if test.succeed != (err == nil) { t.Errorf("%s: expected %v, got %v (%v)", test.caseName, test.succeed, (err == nil), err) } @@ -292,7 +292,7 @@ func TestLifeCycleHook(t *testing.T) { // Configured and works as expected t.Run("PreStop-CMDExec", func(t *testing.T) { testPod.Spec.Containers[0].Lifecycle = cmdLifeCycle - m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod) + m.killContainer(testPod, cID, "foo", "testKill", "", time.Duration(gracePeriod)*time.Second) if fakeRunner.Cmd[0] != cmdLifeCycle.PreStop.Exec.Command[0] { t.Errorf("CMD Prestop hook was not invoked") } @@ -302,7 +302,7 @@ func TestLifeCycleHook(t *testing.T) { t.Run("PreStop-HTTPGet", func(t *testing.T) { defer func() { fakeHTTP.url = "" }() testPod.Spec.Containers[0].Lifecycle = httpLifeCycle - m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod) + m.killContainer(testPod, cID, "foo", "testKill", "", time.Duration(gracePeriod)*time.Second) if !strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) { t.Errorf("HTTP Prestop hook was not invoked") @@ -316,7 +316,7 @@ func TestLifeCycleHook(t *testing.T) { testPod.DeletionGracePeriodSeconds = &gracePeriodLocal testPod.Spec.TerminationGracePeriodSeconds = &gracePeriodLocal - m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriodLocal) + m.killContainer(testPod, cID, "foo", "testKill", "", time.Duration(gracePeriodLocal)*time.Second) if strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) { t.Errorf("HTTP Should not execute when gracePeriod is 0") diff --git a/pkg/kubelet/kuberuntime/kuberuntime_gc.go b/pkg/kubelet/kuberuntime/kuberuntime_gc.go index 7f3f45e4072ab..379815463dd20 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_gc.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_gc.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -137,8 +137,8 @@ func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int ID: containers[i].id, } message := "Container is in unknown state, try killing it before removal" - if err := cgc.manager.killContainer(nil, id, containers[i].name, message, reasonUnknown, nil); err != nil { - klog.ErrorS(err, "Failed to stop container", "containerID", containers[i].id) + if err := cgc.manager.killContainer(nil, id, containers[i].name, message, reasonUnknown, 0); err != nil { + klog.Errorf("Failed to stop container %q: %v", containers[i].id, err) continue } } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 33a84a12475f0..50f12a095bb1e 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -53,6 +53,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/runtimeclass" + "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/sysctl" "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/cache" @@ -539,6 +540,14 @@ func containerSucceeded(c *v1.Container, podStatus *kubecontainer.PodStatus) boo return cStatus.ExitCode == 0 } +func isSidecar(pod *v1.Pod, containerName string) bool { + if pod == nil { + klog.V(5).Infof("isSidecar: pod is nil, so returning false") + return false + } + return pod.Annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", containerName)] == "Sidecar" +} + // computePodActions checks whether the pod spec has changed and returns the changes if true. func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions { klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod)) @@ -553,6 +562,17 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo), } + var sidecarNames []string + for _, container := range pod.Spec.Containers { + if isSidecar(pod, container.Name) { + sidecarNames = append(sidecarNames, container.Name) + } + } + + // determine sidecar status + sidecarStatus := status.GetSidecarsStatus(pod) + klog.Infof("Pod: %s, sidecars: %s, status: Present=%v,Ready=%v,ContainersWaiting=%v", format.Pod(pod), sidecarNames, sidecarStatus.SidecarsPresent, sidecarStatus.SidecarsReady, sidecarStatus.ContainersWaiting) + // If we need to (re-)create the pod sandbox, everything will need to be // killed and recreated, and init containers should be purged. if createPodSandbox { @@ -591,7 +611,22 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku changes.NextInitContainerToStart = &pod.Spec.InitContainers[0] return changes } - changes.ContainersToStart = containersToStart + if len(sidecarNames) > 0 { + for idx, c := range pod.Spec.Containers { + if isSidecar(pod, c.Name) { + changes.ContainersToStart = append(changes.ContainersToStart, idx) + } + } + return changes + } + // Start all containers by default but exclude the ones that + // succeeded if RestartPolicy is OnFailure + for idx, c := range pod.Spec.Containers { + if containerSucceeded(&c, podStatus) && pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure { + continue + } + changes.ContainersToStart = append(changes.ContainersToStart, idx) + } return changes } @@ -637,6 +672,21 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku keepCount := 0 // check the status of containers. for idx, container := range pod.Spec.Containers { + // this works because in other cases, if it was a sidecar, we + // are always allowed to handle the container. + // + // if it is a non-sidecar, and there are no sidecars, then + // we're are also always allowed to restart the container. + // + // if there are sidecars, then we can only restart non-sidecars under + // the following conditions: + // - the non-sidecars have run before (i.e. they are not in a Waiting state) OR + // - the sidecars are ready (we're starting them for the first time) + if !isSidecar(pod, container.Name) && sidecarStatus.SidecarsPresent && sidecarStatus.ContainersWaiting && !sidecarStatus.SidecarsReady { + klog.Infof("Pod: %s, Container: %s, sidecar=%v skipped: Present=%v,Ready=%v,ContainerWaiting=%v", format.Pod(pod), container.Name, isSidecar(pod, container.Name), sidecarStatus.SidecarsPresent, sidecarStatus.SidecarsReady, sidecarStatus.ContainersWaiting) + continue + } + containerStatus := podStatus.FindContainerStatusByName(container.Name) // Call internal container post-stop lifecycle hook for any non-running container so that any @@ -653,7 +703,8 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku // need to restart it. if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning { if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) { - klog.V(3).InfoS("Container of pod is not in the desired state and shall be started", "containerName", container.Name, "pod", klog.KObj(pod)) + message := fmt.Sprintf("%s: Container %+v is dead, but RestartPolicy says that we should restart it.", pod.Name, container) + klog.V(3).Infof(message) changes.ContainersToStart = append(changes.ContainersToStart, idx) if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateUnknown { // If container is in unknown state, we don't know whether it @@ -711,6 +762,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku } if keepCount == 0 && len(changes.ContainersToStart) == 0 { + klog.Infof("Pod: %s: KillPod=true", format.Pod(pod)) changes.KillPod = true } @@ -766,7 +818,7 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontaine klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod)) killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name) result.AddSyncResult(killContainerResult) - if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil { + if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, 0); err != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod)) return @@ -991,6 +1043,35 @@ func (m *kubeGenericRuntimeManager) doBackOff(pod *v1.Pod, container *v1.Contain // only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data. // it is useful when doing SIGKILL for hard eviction scenarios, or max grace period during soft eviction scenarios. func (m *kubeGenericRuntimeManager) KillPod(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error { + // if the pod is nil, we need to recover it, so we can get the + // grace period and also the sidecar status. + if pod == nil { + for _, container := range runningPod.Containers { + klog.Infof("Pod: %s, KillPod: pod nil, trying to restore from container %s", runningPod.Name, container.ID) + podSpec, _, err := m.restoreSpecsFromContainerLabels(container.ID) + if err != nil { + klog.Errorf("Pod: %s, KillPod: couldn't restore: %s", runningPod.Name, container.ID) + continue + } + pod = podSpec + break + } + } + + if gracePeriodOverride == nil && pod != nil { + switch { + case pod.DeletionGracePeriodSeconds != nil: + gracePeriodOverride = pod.DeletionGracePeriodSeconds + case pod.Spec.TerminationGracePeriodSeconds != nil: + gracePeriodOverride = pod.Spec.TerminationGracePeriodSeconds + } + } + + if gracePeriodOverride == nil || *gracePeriodOverride < minimumGracePeriodInSeconds { + min := int64(minimumGracePeriodInSeconds) + gracePeriodOverride = &min + } + err := m.killPodWithSyncResult(pod, runningPod, gracePeriodOverride) return err.Error() } @@ -998,7 +1079,12 @@ func (m *kubeGenericRuntimeManager) KillPod(pod *v1.Pod, runningPod kubecontaine // killPodWithSyncResult kills a runningPod and returns SyncResult. // Note: The pod passed in could be *nil* when kubelet restarted. func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) { - killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride) + gracePeriodDuration := 0 * time.Second + if gracePeriodOverride != nil { + gracePeriodDuration = time.Duration(*gracePeriodOverride) * time.Second + } + + killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodDuration) for _, containerResult := range killContainerResults { result.AddSyncResult(containerResult) } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index 8bec90c8f84e5..ad8fa47a943e5 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -17,6 +17,7 @@ limitations under the License. package kuberuntime import ( + "fmt" "path/filepath" "reflect" "sort" @@ -1371,6 +1372,13 @@ func makeBasePodAndStatusWithInitContainers() (*v1.Pod, *kubecontainer.PodStatus return pod, status } +func makeBasePodAndStatusWithSidecar() (*v1.Pod, *kubecontainer.PodStatus) { + pod, status := makeBasePodAndStatus() + pod.Annotations = map[string]string{fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", pod.Spec.Containers[1].Name): "Sidecar"} + status.ContainerStatuses[1].Hash = kubecontainer.HashContainer(&pod.Spec.Containers[1]) + return pod, status +} + func TestComputePodActionsWithInitAndEphemeralContainers(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EphemeralContainers, true)() // Make sure existing test cases pass with feature enabled @@ -1551,6 +1559,236 @@ func TestSyncPodWithSandboxAndDeletedPod(t *testing.T) { // This will return an error if the pod has _not_ been deleted. assert.NoError(t, result.Error()) } +func TestComputePodActionsWithSidecar(t *testing.T) { + _, _, m, err := createTestRuntimeManager() + require.NoError(t, err) + + // Createing a pair reference pod and status for the test cases to refer + // the specific fields. + basePod, baseStatus := makeBasePodAndStatusWithSidecar() + for desc, test := range map[string]struct { + mutatePodFn func(*v1.Pod) + mutateStatusFn func(*kubecontainer.PodStatus) + actions podActions + }{ + "Start sidecar containers before non-sidecars when creating a new pod": { + mutateStatusFn: func(status *kubecontainer.PodStatus) { + // No container or sandbox exists. + status.SandboxStatuses = []*runtimeapi.PodSandboxStatus{} + status.ContainerStatuses = []*kubecontainer.Status{} + }, + actions: podActions{ + KillPod: true, + CreateSandbox: true, + Attempt: uint32(0), + ContainersToStart: []int{1}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + "Don't start non-sidecars until sidecars are ready": { + mutatePodFn: func(pod *v1.Pod) { + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "foo1", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + { + Name: "foo2", + Ready: false, + }, + { + Name: "foo3", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + } + }, + mutateStatusFn: func(status *kubecontainer.PodStatus) { + for i := range status.ContainerStatuses { + if i == 1 { + continue + } + status.ContainerStatuses[i].State = "" + } + }, + actions: podActions{ + SandboxID: baseStatus.SandboxStatuses[0].Id, + ContainersToStart: []int{}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + "Don't start non-sidecars until sidecars are ready, no known previous container state for all non-sidecar": { + mutatePodFn: func(pod *v1.Pod) { + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "foo2", + Ready: false, + }, + } + }, + mutateStatusFn: func(status *kubecontainer.PodStatus) { + for i := range status.ContainerStatuses { + if i == 1 { + continue + } + status.ContainerStatuses[i].State = "" + } + }, + actions: podActions{ + SandboxID: baseStatus.SandboxStatuses[0].Id, + ContainersToStart: []int{}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + "Don't start non-sidecars until sidecars are ready, no known previous container state for one non-sidecar": { + mutatePodFn: func(pod *v1.Pod) { + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "foo1", + State: v1.ContainerState{}, + }, + { + Name: "foo2", + Ready: false, + }, + } + }, + mutateStatusFn: func(status *kubecontainer.PodStatus) { + for i := range status.ContainerStatuses { + if i == 1 { + continue + } + status.ContainerStatuses[i].State = "" + } + }, + actions: podActions{ + SandboxID: baseStatus.SandboxStatuses[0].Id, + ContainersToStart: []int{}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + "Start non-sidecars when sidecars are ready": { + mutatePodFn: func(pod *v1.Pod) { + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "foo1", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + { + Name: "foo2", + Ready: true, + }, + { + Name: "foo3", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + } + }, + mutateStatusFn: func(status *kubecontainer.PodStatus) { + for i := range status.ContainerStatuses { + if i == 1 { + continue + } + status.ContainerStatuses[i].State = "" + } + }, + actions: podActions{ + SandboxID: baseStatus.SandboxStatuses[0].Id, + ContainersToStart: []int{0, 2}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + "Restart only sidecars while non-sidecars are waiting": { + mutatePodFn: func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyAlways + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "foo1", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + { + Name: "foo2", + Ready: false, + }, + { + Name: "foo3", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + } + }, + mutateStatusFn: func(status *kubecontainer.PodStatus) { + for i := range status.ContainerStatuses { + if i == 1 { + status.ContainerStatuses[i].State = kubecontainer.ContainerStateExited + } + status.ContainerStatuses[i].State = "" + } + }, + actions: podActions{ + SandboxID: baseStatus.SandboxStatuses[0].Id, + ContainersToStart: []int{1}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + "Restart running non-sidecars despite sidecar becoming not ready ": { + mutatePodFn: func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyAlways + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "foo1", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{StartedAt: metav1.Now()}, + }, + }, + { + Name: "foo2", + Ready: false, + }, + { + Name: "foo3", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{StartedAt: metav1.Now()}, + }, + }, + } + }, + mutateStatusFn: func(status *kubecontainer.PodStatus) { + for i := range status.ContainerStatuses { + if i == 1 { + continue + } + status.ContainerStatuses[i].State = kubecontainer.ContainerStateExited + } + }, + actions: podActions{ + SandboxID: baseStatus.SandboxStatuses[0].Id, + ContainersToStart: []int{0, 2}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + } { + pod, status := makeBasePodAndStatusWithSidecar() + if test.mutatePodFn != nil { + test.mutatePodFn(pod) + } + if test.mutateStatusFn != nil { + test.mutateStatusFn(status) + } + actions := m.computePodActions(pod, status) + verifyActions(t, &test.actions, &actions, desc) + } +} func makeBasePodAndStatusWithInitAndEphemeralContainers() (*v1.Pod, *kubecontainer.PodStatus) { pod, status := makeBasePodAndStatus() diff --git a/pkg/kubelet/kuberuntime/labels.go b/pkg/kubelet/kuberuntime/labels.go index 146eadbbb2f70..2da3573852bfe 100644 --- a/pkg/kubelet/kuberuntime/labels.go +++ b/pkg/kubelet/kuberuntime/labels.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -38,6 +38,7 @@ const ( containerTerminationMessagePolicyLabel = "io.kubernetes.container.terminationMessagePolicy" containerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler" containerPortsLabel = "io.kubernetes.container.ports" + containerSidecarLabel = "com.lyft.sidecars.container-lifecycle" ) type labeledPodSandboxInfo struct { @@ -63,6 +64,7 @@ type labeledContainerInfo struct { type annotatedContainerInfo struct { Hash uint64 RestartCount int + Sidecar bool PodDeletionGracePeriod *int64 PodTerminationGracePeriod *int64 TerminationMessagePath string @@ -117,6 +119,11 @@ func newContainerAnnotations(container *v1.Container, pod *v1.Pod, restartCount annotations[containerTerminationMessagePathLabel] = container.TerminationMessagePath annotations[containerTerminationMessagePolicyLabel] = string(container.TerminationMessagePolicy) + annotations[containerSidecarLabel] = "Default" + if isSidecar(pod, container.Name) { + annotations[containerSidecarLabel] = "Sidecar" + } + if pod.DeletionGracePeriodSeconds != nil { annotations[podDeletionGracePeriodLabel] = strconv.FormatInt(*pod.DeletionGracePeriodSeconds, 10) } @@ -202,6 +209,9 @@ func getContainerInfoFromAnnotations(annotations map[string]string) *annotatedCo if containerInfo.PodTerminationGracePeriod, err = getInt64PointerFromLabel(annotations, podTerminationGracePeriodLabel); err != nil { klog.ErrorS(err, "Unable to get label value from annotations", "label", podTerminationGracePeriodLabel, "annotations", annotations) } + if getStringValueFromLabel(annotations, containerSidecarLabel) == "Sidecar" { + containerInfo.Sidecar = true + } preStopHandler := &v1.LifecycleHandler{} if found, err := getJSONObjectFromLabel(annotations, containerPreStopHandlerLabel, preStopHandler); err != nil { diff --git a/pkg/kubelet/stats/cri_stats_provider.go b/pkg/kubelet/stats/cri_stats_provider.go index 650a48a8e6043..ae8a18a92e4fe 100644 --- a/pkg/kubelet/stats/cri_stats_provider.go +++ b/pkg/kubelet/stats/cri_stats_provider.go @@ -25,7 +25,6 @@ import ( "sync" "time" - cadvisorfs "github.com/google/cadvisor/fs" cadvisorapiv2 "github.com/google/cadvisor/info/v2" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -448,12 +447,13 @@ func (p *criStatsProvider) getFsInfo(fsID *runtimeapi.FilesystemIdentifier) *cad mountpoint := fsID.GetMountpoint() fsInfo, err := p.cadvisor.GetDirFsInfo(mountpoint) if err != nil { - msg := "Failed to get the info of the filesystem with mountpoint" - if err == cadvisorfs.ErrNoSuchDevice { - klog.V(2).InfoS(msg, "mountpoint", mountpoint, "err", err) - } else { - klog.ErrorS(err, msg, "mountpoint", mountpoint) - } + // comment out per upstream bug https://github.com/kubernetes/kubernetes/issues/94825 + // msg := fmt.Sprintf("Failed to get the info of the filesystem with mountpoint %q: %v.", mountpoint, err) + // if err == cadvisorfs.ErrNoSuchDevice { + // klog.V(2).Info(msg) + // } else { + // klog.Error(msg) + // } return nil } return &fsInfo diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index f68d282bc6dce..079c3869a87d7 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -39,6 +39,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/format" statusutil "k8s.io/kubernetes/pkg/util/pod" ) @@ -897,3 +898,72 @@ func NeedToReconcilePodReadiness(pod *v1.Pod) bool { } return false } + +// SidecarsStatus contains three bools, whether the pod has sidecars, +// if the all the sidecars are ready and if the non sidecars are in a +// waiting state. +type SidecarsStatus struct { + SidecarsPresent bool + SidecarsReady bool + ContainersWaiting bool +} + +// NotEnoughInformationSidecarStatus should be used when we don't have all the information +// to get the complete status. +// This state prevents application container from making progress +// but sidecars should always be allowed to progress. +var NotEnoughInformationSidecarStatus = SidecarsStatus{SidecarsPresent: true, SidecarsReady: false, ContainersWaiting: true} + +// GetSidecarsStatus returns the SidecarsStatus for the given pod +func GetSidecarsStatus(pod *v1.Pod) SidecarsStatus { + if pod == nil { + klog.Infof("Pod was nil, returning empty sidecar status") + return NotEnoughInformationSidecarStatus + } + if pod.Spec.Containers == nil { + klog.Infof("Pod Containers was nil, returning not enough information sidecar status") + return NotEnoughInformationSidecarStatus + } + if pod.Status.ContainerStatuses == nil { + for _, container := range pod.Spec.Containers { + if isSidecar(pod, container.Name) { + klog.Infof("Pod Containers Status was nil, sidecar is present: returning not enough information status") + return NotEnoughInformationSidecarStatus + } + } + klog.Infof("Pod Containers Status was nil, no sidecar present: returning empty sidecar status") + return SidecarsStatus{} + } + + sidecarsStatus := SidecarsStatus{SidecarsPresent: false, SidecarsReady: true, ContainersWaiting: true} + for _, container := range pod.Spec.Containers { + for _, status := range pod.Status.ContainerStatuses { + if status.Name == container.Name { + if pod.Annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", container.Name)] == "Sidecar" { + sidecarsStatus.SidecarsPresent = true + if !status.Ready { + klog.Infof("Pod %s: %s: sidecar not ready", format.Pod(pod), container.Name) + sidecarsStatus.SidecarsReady = false + } else { + klog.Infof("Pod %s: %s: sidecar is ready", format.Pod(pod), container.Name) + } + } else { + if (status.State == v1.ContainerState{}) || // For zero value, the default state is ContainerStateWaiting. + status.State.Waiting != nil { + // non-sidecar is waiting + klog.Infof("Pod: %s: %s: non-sidecar waiting", format.Pod(pod), container.Name) + } else { + // non-sidecar has started running + klog.Infof("Pod: %s: %s: non-sidecar started", format.Pod(pod), container.Name) + sidecarsStatus.ContainersWaiting = false + } + } + } + } + } + return sidecarsStatus +} + +func isSidecar(pod *v1.Pod, containerName string) bool { + return pod.Annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", containerName)] == "Sidecar" +} diff --git a/pkg/volume/awsebs/aws_util.go b/pkg/volume/awsebs/aws_util.go index d7804f1f581d9..05251b1a1e37d 100644 --- a/pkg/volume/awsebs/aws_util.go +++ b/pkg/volume/awsebs/aws_util.go @@ -154,6 +154,8 @@ func populateVolumeOptions(pluginName, pvcName string, capacityGB resource.Quant volumeOptions := &aws.VolumeOptions{ CapacityGB: requestGiB, Tags: tags, + IOPS: 3000, + Throughput: 125, } // Apply Parameters (case-insensitive). We leave validation of @@ -187,6 +189,16 @@ func populateVolumeOptions(pluginName, pvcName string, capacityGB resource.Quant } case "kmskeyid": volumeOptions.KmsKeyID = v + case "iops": + volumeOptions.IOPS, err = strconv.Atoi(v) + if err != nil { + return nil, fmt.Errorf("invalid iops value %q: must be integer between 3000 and 16000: %v", v, err) + } + case "throughput": + volumeOptions.Throughput, err = strconv.Atoi(v) + if err != nil { + return nil, fmt.Errorf("invalid throughput value %q: must be integer between 125 and 1000: %v", v, err) + } case volume.VolumeParameterFSType: // Do nothing but don't make this fail default: diff --git a/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go b/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go index 93ed45472b963..ac0a192026e66 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go +++ b/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go @@ -457,6 +457,7 @@ const ( VolumeTypeSC1 = "sc1" // Throughput Optimized HDD VolumeTypeST1 = "st1" + VolumeTypeGP3 = "gp3" ) // AWS provisioning limits. @@ -479,6 +480,9 @@ type VolumeOptions struct { // fully qualified resource name to the key to use for encryption. // example: arn:aws:kms:us-east-1:012345678910:key/abcd1234-a123-456a-a12b-a123b4cd56ef KmsKeyID string + // options for gp3 + IOPS int + Throughput int } // Volumes is an interface for managing cloud-provisioned volumes @@ -2549,10 +2553,16 @@ func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName) func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, error) { var createType string var iops int64 + var throughput int64 switch volumeOptions.VolumeType { case VolumeTypeGP2, VolumeTypeSC1, VolumeTypeST1: createType = volumeOptions.VolumeType + case VolumeTypeGP3: + createType = volumeOptions.VolumeType + iops = int64(volumeOptions.IOPS) + throughput = int64(volumeOptions.Throughput) + case VolumeTypeIO1: // See http://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateVolume.html // for IOPS constraints. AWS will throw an error if IOPS per GB gets out @@ -2588,6 +2598,9 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, er if iops > 0 { request.Iops = aws.Int64(iops) } + if throughput > 0 { + request.Throughput = aws.Int64(throughput) + } tags := volumeOptions.Tags tags = c.tagging.buildTags(ResourceLifecycleOwned, tags)