diff --git a/cli/command/stack/kubernetes/deploy.go b/cli/command/stack/kubernetes/deploy.go index f8558e59a15e..5d943c877320 100644 --- a/cli/command/stack/kubernetes/deploy.go +++ b/cli/command/stack/kubernetes/deploy.go @@ -31,10 +31,6 @@ func RunDeploy(dockerCli *KubeCli, opts options.Deploy) error { configMaps := composeClient.ConfigMaps() secrets := composeClient.Secrets() services := composeClient.Services() - pods := composeClient.Pods() - watcher := DeployWatcher{ - Pods: pods, - } // Parse the compose file stack, cfg, err := LoadStack(opts.Namespace, opts.Composefiles) @@ -73,10 +69,17 @@ func RunDeploy(dockerCli *KubeCli, opts options.Deploy) error { fmt.Fprintln(cmdOut, "Waiting for the stack to be stable and running...") - <-watcher.Watch(stack, serviceNames(cfg)) + pods := composeClient.Pods() + watcher := &deployWatcher{ + out: dockerCli.Out(), + stacks: stacks, + pods: pods, + } + if err := watcher.Watch(stack, serviceNames(cfg)); err != nil { + return err + } fmt.Fprintf(cmdOut, "Stack %s is stable and running\n\n", stack.Name) - // TODO: fmt.Fprintf(cmdOut, "Read the logs with:\n $ %s stack logs %s\n", filepath.Base(os.Args[0]), stack.Name) return nil } diff --git a/cli/command/stack/kubernetes/watcher.go b/cli/command/stack/kubernetes/watcher.go index eef2ec82819a..0e0df8f03b3c 100644 --- a/cli/command/stack/kubernetes/watcher.go +++ b/cli/command/stack/kubernetes/watcher.go @@ -2,41 +2,75 @@ package kubernetes import ( "fmt" + "io" "time" + composev1beta1 "github.com/docker/cli/kubernetes/client/clientset_generated/clientset/typed/compose/v1beta1" apiv1beta1 "github.com/docker/cli/kubernetes/compose/v1beta1" "github.com/docker/cli/kubernetes/labels" + "github.com/pkg/errors" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" ) // DeployWatcher watches a stack deployement -type DeployWatcher struct { - Pods corev1.PodInterface +type deployWatcher struct { + pods corev1.PodInterface + stacks composev1beta1.StackInterface + out io.Writer } // Watch watches a stuck deployement and return a chan that will holds the state of the stack -func (w DeployWatcher) Watch(stack *apiv1beta1.Stack, serviceNames []string) chan bool { - stop := make(chan bool) +func (w *deployWatcher) Watch(stack *apiv1beta1.Stack, serviceNames []string) error { + err := make(chan error) - go w.waitForPods(stack.Name, serviceNames, stop) + go w.watchStackStatus(stack.Name, err) + go w.waitForPods(stack.Name, serviceNames, err) - return stop + return <-err } -func (w DeployWatcher) waitForPods(stackName string, serviceNames []string, stop chan bool) { - starts := map[string]int32{} +func (w *deployWatcher) watchStackStatus(stackname string, e chan error) { + + watcher, err := w.stacks.Watch(metav1.ListOptions{ + LabelSelector: "com.docker.stack.namespace=" + stackname, + }) + if err != nil { + e <- err + return + } for { - time.Sleep(1 * time.Second) + select { + case ev := <-watcher.ResultChan(): + if ev.Type != watch.Added && ev.Type != watch.Modified { + continue + } + stack := ev.Object.(*apiv1beta1.Stack) + if stack.Status.Phase == apiv1beta1.StackFailure { + e <- errors.Errorf("stack %s failed with status %s", stackname, stack.Status.Phase) + return + } + case <-e: + return + } + } +} + +func (w *deployWatcher) waitForPods(stackName string, serviceNames []string, e chan error) { + starts := map[string]int32{} + t := time.NewTicker(250 * time.Millisecond) + defer t.Stop() - list, err := w.Pods.List(metav1.ListOptions{ + for { + list, err := w.pods.List(metav1.ListOptions{ LabelSelector: labels.SelectorForStack(stackName), IncludeUninitialized: true, }) if err != nil { - stop <- true + e <- err return } @@ -60,7 +94,13 @@ func (w DeployWatcher) waitForPods(stackName string, serviceNames []string, stop } if allReady(list.Items, serviceNames) { - stop <- true + e <- nil + return + } + select { + case <-t.C: + continue + case <-e: return } }