Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions cmd/cluster-bootstrap/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ var (
}

startOpts struct {
assetDir string
podManifestPath string
strict bool
requiredPods []string
assetDir string
podManifestPath string
strict bool
requiredPods []string
waitForTearDownEvent string
}
)

Expand All @@ -41,25 +42,22 @@ func init() {
cmdStart.Flags().StringVar(&startOpts.podManifestPath, "pod-manifest-path", "/etc/kubernetes/manifests", "The location where the kubelet is configured to look for static pod manifests.")
cmdStart.Flags().BoolVar(&startOpts.strict, "strict", false, "Strict mode will cause start command to exit early if any manifests in the asset directory cannot be created.")
cmdStart.Flags().StringSliceVar(&startOpts.requiredPods, "required-pods", defaultRequiredPods, "List of pods with their namespace (written as <namespace>/<pod-name>) that are required to be running and ready before the start command does the pivot.")
cmdStart.Flags().StringVar(&startOpts.waitForTearDownEvent, "tear-down-event", "", "if this optional event name of the form <ns>/<event-name> is given, the event is waited for before tearing down the bootstrap control plane")
}

func runCmdStart(cmd *cobra.Command, args []string) error {
bk, err := start.NewStartCommand(start.Config{
AssetDir: startOpts.assetDir,
PodManifestPath: startOpts.podManifestPath,
Strict: startOpts.strict,
RequiredPods: startOpts.requiredPods,
AssetDir: startOpts.assetDir,
PodManifestPath: startOpts.podManifestPath,
Strict: startOpts.strict,
RequiredPods: startOpts.requiredPods,
WaitForTearDownEvent: startOpts.waitForTearDownEvent,
})
if err != nil {
return err
}

err = bk.Run()
if err != nil {
// Always report errors.
start.UserOutput("Error: %v\n", err)
}
return err
return bk.Run()
}

func validateStartOpts(cmd *cobra.Command, args []string) error {
Expand Down
15 changes: 3 additions & 12 deletions pkg/start/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,18 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

const (
crdRolloutDuration = 1 * time.Second
crdRolloutTimeout = 2 * time.Minute
)

func createAssets(config clientcmd.ClientConfig, manifestDir string, timeout time.Duration, strict bool) error {
func createAssets(c *rest.Config, manifestDir string, timeout time.Duration, strict bool) error {
if _, err := os.Stat(manifestDir); os.IsNotExist(err) {
UserOutput(fmt.Sprintf("WARNING: %v does not exist, not creating any self-hosted assets.\n", manifestDir))
return nil
}
c, err := config.ClientConfig()
if err != nil {
return err
}
creater, err := newCreater(c, strict)
if err != nil {
return err
Expand All @@ -54,7 +49,7 @@ func createAssets(config clientcmd.ClientConfig, manifestDir string, timeout tim
}

upFn := func() (bool, error) {
if err := apiTest(config); err != nil {
if err := apiTest(c); err != nil {
glog.Warningf("Unable to determine api-server readiness: %v", err)
return false, nil
}
Expand Down Expand Up @@ -84,11 +79,7 @@ func createAssets(config clientcmd.ClientConfig, manifestDir string, timeout tim
return nil
}

func apiTest(c clientcmd.ClientConfig) error {
config, err := c.ClientConfig()
if err != nil {
return err
}
func apiTest(config *rest.Config) error {
client, err := kubernetes.NewForConfig(config)
if err != nil {
return err
Expand Down
107 changes: 86 additions & 21 deletions pkg/start/start.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,60 @@
package start

import (
"context"
"fmt"
"path/filepath"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

const assetTimeout = 20 * time.Minute
const (
// how long we wait until the bootstrap pods to be running
bootstrapPodsRunningTimeout = 20 * time.Minute
)

type Config struct {
AssetDir string
PodManifestPath string
Strict bool
RequiredPods []string
AssetDir string
PodManifestPath string
Strict bool
RequiredPods []string
WaitForTearDownEvent string
}

type startCommand struct {
podManifestPath string
assetDir string
strict bool
requiredPods []string
podManifestPath string
assetDir string
strict bool
requiredPods []string
waitForTearDownEvent string
}

func NewStartCommand(config Config) (*startCommand, error) {
return &startCommand{
assetDir: config.AssetDir,
podManifestPath: config.PodManifestPath,
strict: config.Strict,
requiredPods: config.RequiredPods,
assetDir: config.AssetDir,
podManifestPath: config.PodManifestPath,
strict: config.Strict,
requiredPods: config.RequiredPods,
waitForTearDownEvent: config.WaitForTearDownEvent,
}, nil
}

func (b *startCommand) Run() error {
// TODO(diegs): create and share a single client rather than the kubeconfig once all uses of it
// are migrated to client-go.
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: filepath.Join(b.assetDir, assetPathAdminKubeConfig)},
&clientcmd.ConfigOverrides{})
restConfig, err := clientcmd.BuildConfigFromFlags("", filepath.Join(b.assetDir, assetPathAdminKubeConfig))
if err != nil {
return err
}
client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}

bcp := newBootstrapControlPlane(b.assetDir, b.podManifestPath)

Expand All @@ -49,7 +65,6 @@ func (b *startCommand) Run() error {
}
}()

var err error
defer func() {
// Always report errors.
if err != nil {
Expand All @@ -61,14 +76,34 @@ func (b *startCommand) Run() error {
return err
}

if err = createAssets(kubeConfig, filepath.Join(b.assetDir, assetPathManifests), assetTimeout, b.strict); err != nil {
if err = createAssets(restConfig, filepath.Join(b.assetDir, assetPathManifests), bootstrapPodsRunningTimeout, b.strict); err != nil {
return err
}

if err = waitUntilPodsRunning(kubeConfig, b.requiredPods, assetTimeout); err != nil {
if err = waitUntilPodsRunning(client, b.requiredPods, bootstrapPodsRunningTimeout); err != nil {
return err
}

// notify installer that we are ready to tear down the temporary bootstrap control plane
UserOutput("Sending bootstrap-success event.")
if _, err := client.CoreV1().Events("kube-system").Create(makeBootstrapSuccessEvent("kube-system", "bootstrap-success")); err != nil && !apierrors.IsAlreadyExists(err) {
return err
}

// optionally wait for tear down event coming from the installer. This is necessary to
// remove the bootstrap node from the AWS load balancer.
if len(b.waitForTearDownEvent) != 0 {
ss := strings.Split(b.waitForTearDownEvent, "/")
if len(ss) != 2 {
return fmt.Errorf("tear down event name of format <namespace>/<event-name> expected, got: %q", b.waitForTearDownEvent)
}
ns, name := ss[0], ss[1]
Comment thread
sttts marked this conversation as resolved.
if err := waitForEvent(context.TODO(), client, ns, name); err != nil {
return err
}
UserOutput("Got %s event.", b.waitForTearDownEvent)
}

return nil
}

Expand All @@ -79,3 +114,33 @@ func (b *startCommand) Run() error {
func UserOutput(format string, a ...interface{}) {
fmt.Printf(format, a...)
}

func waitForEvent(ctx context.Context, client kubernetes.Interface, ns, name string) error {
return wait.PollImmediateUntil(time.Second, func() (done bool, err error) {
if _, err := client.CoreV1().Events(ns).Get(name, metav1.GetOptions{}); err != nil && apierrors.IsNotFound(err) {
return false, nil
} else if err != nil {
UserOutput("Error waiting for %s/%s event: %v", ns, name, err)
return false, nil
}
return true, nil
}, ctx.Done())
}

func makeBootstrapSuccessEvent(ns, name string) *corev1.Event {
currentTime := metav1.Time{Time: time.Now()}
event := &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
InvolvedObject: corev1.ObjectReference{
Namespace: ns,
},
Message: "Required control plane pods have been created",
Count: 1,
FirstTimestamp: currentTime,
LastTimestamp: currentTime,
}
return event
}
13 changes: 2 additions & 11 deletions pkg/start/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)

func waitUntilPodsRunning(c clientcmd.ClientConfig, pods []string, timeout time.Duration) error {
func waitUntilPodsRunning(c kubernetes.Interface, pods []string, timeout time.Duration) error {
sc, err := newStatusController(c, pods)
if err != nil {
return err
Expand All @@ -39,15 +38,7 @@ type statusController struct {
lastPodPhases map[string]*podStatus
}

func newStatusController(c clientcmd.ClientConfig, pods []string) (*statusController, error) {
config, err := c.ClientConfig()
if err != nil {
return nil, err
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
func newStatusController(client kubernetes.Interface, pods []string) (*statusController, error) {
return &statusController{client: client, watchPods: pods}, nil
}

Expand Down