From 889435a18b0c41bf4c020072920c80dd6788e2ba Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Thu, 5 Jun 2025 10:58:12 +0200 Subject: [PATCH 1/2] (refactoring) introduce monitor to manage containers events and application termination Signed-off-by: Nicolas De Loof --- cmd/formatter/logs.go | 4 - pkg/api/api.go | 31 +++--- pkg/compose/attach.go | 46 ++++---- pkg/compose/containers.go | 6 - pkg/compose/hook.go | 10 +- pkg/compose/logs.go | 67 +++++------- pkg/compose/logs_test.go | 2 - pkg/compose/monitor.go | 211 +++++++++++++++++++++++++++++++++++ pkg/compose/printer.go | 81 ++------------ pkg/compose/start.go | 225 -------------------------------------- pkg/compose/up.go | 86 +++++++++++---- pkg/compose/watch.go | 1 - pkg/compose/watch_test.go | 3 - pkg/prompt/prompt_mock.go | 2 +- 14 files changed, 352 insertions(+), 423 deletions(-) create mode 100644 pkg/compose/monitor.go diff --git a/cmd/formatter/logs.go b/cmd/formatter/logs.go index 0e2d206c8dd..2faa77b65e0 100644 --- a/cmd/formatter/logs.go +++ b/cmd/formatter/logs.go @@ -56,10 +56,6 @@ func NewLogConsumer(ctx context.Context, stdout, stderr io.Writer, color, prefix } } -func (l *logConsumer) Register(name string) { - l.register(name) -} - func (l *logConsumer) register(name string) *presenter { var p *presenter root, _, found := strings.Cut(name, " ") diff --git a/pkg/api/api.go b/pkg/api/api.go index 359893bb341..0bec86a6e13 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -649,7 +649,6 @@ type LogConsumer interface { Log(containerName, message string) Err(containerName, message string) Status(container, msg string) - Register(container string) } // ContainerEventListener is a callback to process ContainerEvent from services @@ -657,16 +656,18 @@ type ContainerEventListener func(event ContainerEvent) // ContainerEvent notify an event has been collected on source container implementing Service type ContainerEvent struct { - Type int - // Container is the name of the container _without the project prefix_. + Type int + Time int64 + Container *ContainerSummary + // Source is the name of the container _without the project prefix_. // // This is only suitable for display purposes within Compose, as it's // not guaranteed to be unique across services. - Container string - ID string - Service string - Line string - // ContainerEventExit only + Source string + ID string + Service string + Line string + // ContainerEventExited only ExitCode int Restarting bool } @@ -676,17 +677,19 @@ const ( ContainerEventLog = iota // ContainerEventErr is a ContainerEvent of type log on stderr. Line is set ContainerEventErr - // ContainerEventAttach is a ContainerEvent of type attach. First event sent about a container - ContainerEventAttach + // ContainerEventStarted let consumer know a container has been started + ContainerEventStarted + // ContainerEventRestarted let consumer know a container has been restarted + ContainerEventRestarted // ContainerEventStopped is a ContainerEvent of type stopped. ContainerEventStopped + // ContainerEventCreated let consumer know a new container has been created + ContainerEventCreated // ContainerEventRecreated let consumer know container stopped but his being replaced ContainerEventRecreated - // ContainerEventExit is a ContainerEvent of type exit. ExitCode is set - ContainerEventExit + // ContainerEventExited is a ContainerEvent of type exit. ExitCode is set + ContainerEventExited // UserCancel user cancelled compose up, we are stopping containers - UserCancel - // HookEventLog is a ContainerEvent of type log on stdout by service hook HookEventLog ) diff --git a/pkg/compose/attach.go b/pkg/compose/attach.go index 8c17e056ad1..897c4733170 100644 --- a/pkg/compose/attach.go +++ b/pkg/compose/attach.go @@ -61,41 +61,37 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis } func (s *composeService) attachContainer(ctx context.Context, container containerType.Summary, listener api.ContainerEventListener) error { - serviceName := container.Labels[api.ServiceLabel] - containerName := getContainerNameWithoutProject(container) - - listener(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: containerName, - ID: container.ID, - Service: serviceName, - }) + service := container.Labels[api.ServiceLabel] + name := getContainerNameWithoutProject(container) + return s.doAttachContainer(ctx, service, container.ID, name, listener) +} + +func (s *composeService) doAttachContainer(ctx context.Context, service, id, name string, listener api.ContainerEventListener) error { + inspect, err := s.apiClient().ContainerInspect(ctx, id) + if err != nil { + return err + } wOut := utils.GetWriter(func(line string) { listener(api.ContainerEvent{ - Type: api.ContainerEventLog, - Container: containerName, - ID: container.ID, - Service: serviceName, - Line: line, + Type: api.ContainerEventLog, + Source: name, + ID: id, + Service: service, + Line: line, }) }) wErr := utils.GetWriter(func(line string) { listener(api.ContainerEvent{ - Type: api.ContainerEventErr, - Container: containerName, - ID: container.ID, - Service: serviceName, - Line: line, + Type: api.ContainerEventErr, + Source: name, + ID: id, + Service: service, + Line: line, }) }) - inspect, err := s.apiClient().ContainerInspect(ctx, container.ID) - if err != nil { - return err - } - - _, _, err = s.attachContainerStreams(ctx, container.ID, inspect.Config.Tty, nil, wOut, wErr) + _, _, err = s.attachContainerStreams(ctx, id, inspect.Config.Tty, nil, wOut, wErr) return err } diff --git a/pkg/compose/containers.go b/pkg/compose/containers.go index ebf70d01301..598cc2a234e 100644 --- a/pkg/compose/containers.go +++ b/pkg/compose/containers.go @@ -128,12 +128,6 @@ func isService(services ...string) containerPredicate { } } -func isRunning() containerPredicate { - return func(c container.Summary) bool { - return c.State == "running" - } -} - // isOrphaned is a predicate to select containers without a matching service definition in compose project func isOrphaned(project *types.Project) containerPredicate { services := append(project.ServiceNames(), project.DisabledServiceNames()...) diff --git a/pkg/compose/hook.go b/pkg/compose/hook.go index 6bd3f84bf93..dd02de640a3 100644 --- a/pkg/compose/hook.go +++ b/pkg/compose/hook.go @@ -32,11 +32,11 @@ import ( func (s composeService) runHook(ctx context.Context, ctr container.Summary, service types.ServiceConfig, hook types.ServiceHook, listener api.ContainerEventListener) error { wOut := utils.GetWriter(func(line string) { listener(api.ContainerEvent{ - Type: api.HookEventLog, - Container: getContainerNameWithoutProject(ctr) + " ->", - ID: ctr.ID, - Service: service.Name, - Line: line, + Type: api.HookEventLog, + Source: getContainerNameWithoutProject(ctr) + " ->", + ID: ctr.ID, + Service: service.Name, + Line: line, }) }) defer wOut.Close() //nolint:errcheck diff --git a/pkg/compose/logs.go b/pkg/compose/logs.go index b9a108fc3d3..2976dace4b2 100644 --- a/pkg/compose/logs.go +++ b/pkg/compose/logs.go @@ -62,7 +62,7 @@ func (s *composeService) Logs( eg, ctx := errgroup.WithContext(ctx) for _, ctr := range containers { eg.Go(func() error { - err := s.logContainers(ctx, consumer, ctr, options) + err := s.logContainer(ctx, consumer, ctr, options) if errdefs.IsNotImplemented(err) { logrus.Warnf("Can't retrieve logs for %q: %s", getCanonicalContainerName(ctr), err.Error()) return nil @@ -72,34 +72,21 @@ func (s *composeService) Logs( } if options.Follow { - containers = containers.filter(isRunning()) printer := newLogPrinter(consumer) - eg.Go(func() error { - _, err := printer.Run(api.CascadeIgnore, "", nil) - return err - }) - - for _, c := range containers { - printer.HandleEvent(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: getContainerNameWithoutProject(c), - ID: c.ID, - Service: c.Labels[api.ServiceLabel], - }) - } + eg.Go(printer.Run) - eg.Go(func() error { - err := s.watchContainers(ctx, projectName, options.Services, nil, printer.HandleEvent, containers, func(c container.Summary, t time.Time) error { - printer.HandleEvent(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: getContainerNameWithoutProject(c), - ID: c.ID, - Service: c.Labels[api.ServiceLabel], - }) + monitor := newMonitor(s.apiClient(), options.Project) + monitor.withListener(func(event api.ContainerEvent) { + if event.Type == api.ContainerEventStarted { eg.Go(func() error { - err := s.logContainers(ctx, consumer, c, api.LogOptions{ + ctr, err := s.apiClient().ContainerInspect(ctx, event.ID) + if err != nil { + return err + } + + err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{ Follow: options.Follow, - Since: t.Format(time.RFC3339Nano), + Since: time.Unix(0, event.Time).Format(time.RFC3339Nano), Until: options.Until, Tail: options.Tail, Timestamps: options.Timestamps, @@ -110,31 +97,28 @@ func (s *composeService) Logs( } return err }) - return nil - }, func(c container.Summary, t time.Time) error { - printer.HandleEvent(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: "", // actual name will be set by start event - ID: c.ID, - Service: c.Labels[api.ServiceLabel], - }) - return nil - }) - printer.Stop() - return err + } + }) + eg.Go(func() error { + defer printer.Stop() + return monitor.Start(ctx) }) } return eg.Wait() } -func (s *composeService) logContainers(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error { - cnt, err := s.apiClient().ContainerInspect(ctx, c.ID) +func (s *composeService) logContainer(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error { + ctr, err := s.apiClient().ContainerInspect(ctx, c.ID) if err != nil { return err } + name := getContainerNameWithoutProject(c) + return s.doLogContainer(ctx, consumer, name, ctr, options) +} - r, err := s.apiClient().ContainerLogs(ctx, cnt.ID, container.LogsOptions{ +func (s *composeService) doLogContainer(ctx context.Context, consumer api.LogConsumer, name string, ctr container.InspectResponse, options api.LogOptions) error { + r, err := s.apiClient().ContainerLogs(ctx, ctr.ID, container.LogsOptions{ ShowStdout: true, ShowStderr: true, Follow: options.Follow, @@ -148,11 +132,10 @@ func (s *composeService) logContainers(ctx context.Context, consumer api.LogCons } defer r.Close() //nolint:errcheck - name := getContainerNameWithoutProject(c) w := utils.GetWriter(func(line string) { consumer.Log(name, line) }) - if cnt.Config.Tty { + if ctr.Config.Tty { _, err = io.Copy(w, r) } else { _, err = stdcopy.StdCopy(w, w, r) diff --git a/pkg/compose/logs_test.go b/pkg/compose/logs_test.go index 46893e636c6..955b5e770d5 100644 --- a/pkg/compose/logs_test.go +++ b/pkg/compose/logs_test.go @@ -189,8 +189,6 @@ func (l *testLogConsumer) Err(containerName, message string) { func (l *testLogConsumer) Status(containerName, msg string) {} -func (l *testLogConsumer) Register(containerName string) {} - func (l *testLogConsumer) LogsForContainer(containerName string) []string { l.mu.Lock() defer l.mu.Unlock() diff --git a/pkg/compose/monitor.go b/pkg/compose/monitor.go new file mode 100644 index 00000000000..624f01d5d47 --- /dev/null +++ b/pkg/compose/monitor.go @@ -0,0 +1,211 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "context" + "strconv" + + "github.com/compose-spec/compose-go/v2/types" + "github.com/containerd/errdefs" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/client" + "github.com/sirupsen/logrus" + + "github.com/docker/compose/v2/pkg/api" + "github.com/docker/compose/v2/pkg/utils" +) + +type monitor struct { + api client.APIClient + project *types.Project + // services tells us which service to consider and those we can ignore, maybe ran by a concurrent compose command + services map[string]bool + listeners []api.ContainerEventListener +} + +func newMonitor(api client.APIClient, project *types.Project) *monitor { + services := map[string]bool{} + if project != nil { + for name := range project.Services { + services[name] = true + } + } + return &monitor{ + api: api, + project: project, + services: services, + } +} + +// Start runs monitor to detect application events and return after termination +// +//nolint:gocyclo +func (c *monitor) Start(ctx context.Context) error { + // collect initial application container + initialState, err := c.api.ContainerList(ctx, container.ListOptions{ + All: true, + Filters: filters.NewArgs( + projectFilter(c.project.Name), + oneOffFilter(false), + hasConfigHashLabel(), + ), + }) + if err != nil { + return err + } + + // containers is the set if container IDs the application is based on + containers := utils.Set[string]{} + for _, ctr := range initialState { + if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] { + containers.Add(ctr.ID) + } + } + + restarting := utils.Set[string]{} + + evtCh, errCh := c.api.Events(ctx, events.ListOptions{ + Filters: filters.NewArgs( + filters.Arg("type", "container"), + projectFilter(c.project.Name)), + }) + for { + select { + case <-ctx.Done(): + return nil + case err := <-errCh: + return err + case event := <-evtCh: + if !c.services[event.Actor.Attributes[api.ServiceLabel]] { + continue + } + ctr, err := c.getContainerSummary(event) + if err != nil { + return err + } + + switch event.Action { + case events.ActionCreate: + containers.Add(ctr.ID) + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventCreated)) + } + logrus.Debugf("container %s created", ctr.Name) + case events.ActionStart: + restarted := restarting.Has(ctr.ID) + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) { + e.Restarting = restarted + })) + } + if restarted { + logrus.Debugf("container %s restarted", ctr.Name) + } else { + logrus.Debugf("container %s started", ctr.Name) + } + containers.Add(ctr.ID) + case events.ActionRestart: + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventRestarted)) + } + logrus.Debugf("container %s restarted", ctr.Name) + case events.ActionStop: + // when a container is in restarting phase, and we stop the application (abort-on-container-exit) + // we won't get any additional start+die events, just this stop as a proof container is down + logrus.Debugf("container %s stopped", ctr.Name) + containers.Remove(ctr.ID) + case events.ActionDie: + logrus.Debugf("container %s exited with code %d", ctr.Name, ctr.ExitCode) + inspect, err := c.api.ContainerInspect(ctx, event.Actor.ID) + if errdefs.IsNotFound(err) { + // Source is already removed + } else if err != nil { + return err + } + + if inspect.State != nil && inspect.State.Restarting || inspect.State.Running { + // State.Restarting is set by engine when container is configured to restart on exit + // on ContainerRestart it doesn't (see https://github.com/moby/moby/issues/45538) + // container state still is reported as "running" + logrus.Debugf("container %s is restarting", ctr.Name) + restarting.Add(ctr.ID) + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventExited, func(e *api.ContainerEvent) { + e.Restarting = true + })) + } + } else { + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventExited)) + } + containers.Remove(ctr.ID) + } + } + } + if len(containers) == 0 { + return nil + } + } +} + +func newContainerEvent(timeNano int64, ctr *api.ContainerSummary, eventType int, opts ...func(e *api.ContainerEvent)) api.ContainerEvent { + name := ctr.Name + defaultName := getDefaultContainerName(ctr.Project, ctr.Labels[api.ServiceLabel], ctr.Labels[api.ContainerNumberLabel]) + if name == defaultName { + // remove project- prefix + name = name[len(ctr.Project)+1:] + } + + event := api.ContainerEvent{ + Type: eventType, + Container: ctr, + Time: timeNano, + Source: name, + ID: ctr.ID, + Service: ctr.Service, + ExitCode: ctr.ExitCode, + } + for _, opt := range opts { + opt(&event) + } + return event +} + +func (c *monitor) getContainerSummary(event events.Message) (*api.ContainerSummary, error) { + ctr := &api.ContainerSummary{ + ID: event.Actor.ID, + Name: event.Actor.Attributes["name"], + Project: c.project.Name, + Service: event.Actor.Attributes[api.ServiceLabel], + Labels: event.Actor.Attributes, // More than just labels, but that'c the closest the API gives us + } + if ec, ok := event.Actor.Attributes["exitCode"]; ok { + exitCode, err := strconv.Atoi(ec) + if err != nil { + return nil, err + } + ctr.ExitCode = exitCode + } + return ctr, nil +} + +func (c *monitor) withListener(listener api.ContainerEventListener) { + c.listeners = append(c.listeners, listener) +} diff --git a/pkg/compose/printer.go b/pkg/compose/printer.go index 338312faebb..55084270ab5 100644 --- a/pkg/compose/printer.go +++ b/pkg/compose/printer.go @@ -26,8 +26,7 @@ import ( // logPrinter watch application containers and collect their logs type logPrinter interface { HandleEvent(event api.ContainerEvent) - Run(cascade api.Cascade, exitCodeFrom string, stopFn func() error) (int, error) - Cancel() + Run() error Stop() } @@ -49,11 +48,6 @@ func newLogPrinter(consumer api.LogConsumer) logPrinter { return &printer } -func (p *printer) Cancel() { - // note: HandleEvent is used to ensure this doesn't deadlock - p.HandleEvent(api.ContainerEvent{Type: api.UserCancel}) -} - func (p *printer) Stop() { p.stop.Do(func() { close(p.stopCh) @@ -78,82 +72,25 @@ func (p *printer) HandleEvent(event api.ContainerEvent) { } } -//nolint:gocyclo -func (p *printer) Run(cascade api.Cascade, exitCodeFrom string, stopFn func() error) (int, error) { - var ( - aborting bool - exitCode int - ) +func (p *printer) Run() error { defer p.Stop() // containers we are tracking. Use true when container is running, false after we receive a stop|die signal - containers := map[string]bool{} for { select { case <-p.stopCh: - return exitCode, nil + return nil case event := <-p.queue: - container, id := event.Container, event.ID switch event.Type { - case api.UserCancel: - aborting = true - case api.ContainerEventAttach: - if attached, ok := containers[id]; ok && attached { - continue - } - containers[id] = true - p.consumer.Register(container) - case api.ContainerEventExit, api.ContainerEventStopped, api.ContainerEventRecreated: - if !aborting && containers[id] { - p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode)) - if event.Type == api.ContainerEventRecreated { - p.consumer.Status(container, "has been recreated") - } - } - containers[id] = false - if !event.Restarting { - delete(containers, id) - } - - if cascade == api.CascadeStop { - if !aborting { - aborting = true - err := stopFn() - if err != nil { - return 0, err - } - } - } - if event.Type == api.ContainerEventExit { - if cascade == api.CascadeFail && event.ExitCode != 0 { - exitCodeFrom = event.Service - if !aborting { - aborting = true - err := stopFn() - if err != nil { - return 0, err - } - } - } - if cascade == api.CascadeStop && exitCodeFrom == "" { - exitCodeFrom = event.Service - } - } - - if exitCodeFrom == event.Service && (event.Type == api.ContainerEventExit || event.Type == api.ContainerEventStopped) { - // Container was interrupted or exited, let's capture exit code - exitCode = event.ExitCode - } - if len(containers) == 0 { - // Last container terminated, done - return exitCode, nil + case api.ContainerEventExited, api.ContainerEventStopped, api.ContainerEventRecreated, api.ContainerEventRestarted: + p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode)) + if event.Type == api.ContainerEventRecreated { + p.consumer.Status(event.Source, "has been recreated") } case api.ContainerEventLog, api.HookEventLog: - p.consumer.Log(container, event.Line) + p.consumer.Log(event.Source, event.Line) case api.ContainerEventErr: - if !aborting { - p.consumer.Err(container, event.Line) - } + p.consumer.Err(event.Source, event.Line) } } } diff --git a/pkg/compose/start.go b/pkg/compose/start.go index a18487b50f1..1d1c264cfd5 100644 --- a/pkg/compose/start.go +++ b/pkg/compose/start.go @@ -20,14 +20,10 @@ import ( "context" "errors" "fmt" - "slices" "strings" - "time" - cerrdefs "github.com/containerd/errdefs" "github.com/docker/compose/v2/pkg/api" "github.com/docker/compose/v2/pkg/progress" - "github.com/docker/compose/v2/pkg/utils" containerType "github.com/docker/docker/api/types/container" "github.com/compose-spec/compose-go/v2/types" @@ -66,48 +62,6 @@ func (s *composeService) start(ctx context.Context, projectName string, options if err != nil { return err } - - eg.Go(func() error { - // it's possible to have a required service whose log output is not desired - // (i.e. it's not in the attach set), so watch everything and then filter - // calls to attach; this ensures that `watchContainers` blocks until all - // required containers have exited, even if their output is not being shown - attachTo := utils.NewSet[string](options.AttachTo...) - required := utils.NewSet[string](options.Services...) - toWatch := attachTo.Union(required).Elements() - - containers, err := s.getContainers(ctx, projectName, oneOffExclude, true, toWatch...) - if err != nil { - return err - } - - // N.B. this uses the parent context (instead of attachCtx) so that the watch itself can - // continue even if one of the log streams fails - return s.watchContainers(ctx, project.Name, toWatch, required.Elements(), listener, containers, - func(ctr containerType.Summary, _ time.Time) error { - svc := ctr.Labels[api.ServiceLabel] - if attachTo.Has(svc) { - return s.attachContainer(attachCtx, ctr, listener) - } - - // HACK: simulate an "attach" event - listener(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: getContainerNameWithoutProject(ctr), - ID: ctr.ID, - Service: svc, - }) - return nil - }, func(ctr containerType.Summary, _ time.Time) error { - listener(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: "", // actual name will be set by start event - ID: ctr.ID, - Service: ctr.Labels[api.ServiceLabel], - }) - return nil - }) - }) } var containers Containers @@ -173,182 +127,3 @@ func getDependencyCondition(service types.ServiceConfig, project *types.Project) } return ServiceConditionRunningOrHealthy } - -type containerWatchFn func(ctr containerType.Summary, t time.Time) error - -// watchContainers uses engine events to capture container start/die and notify ContainerEventListener -func (s *composeService) watchContainers(ctx context.Context, //nolint:gocyclo - projectName string, services, required []string, - listener api.ContainerEventListener, containers Containers, onStart, onRecreate containerWatchFn, -) error { - if len(containers) == 0 { - return nil - } - if len(required) == 0 { - required = services - } - - unexpected := utils.NewSet[string](required...).Diff(utils.NewSet[string](services...)) - if len(unexpected) != 0 { - return fmt.Errorf(`required service(s) "%s" not present in watched service(s) "%s"`, - strings.Join(unexpected.Elements(), ", "), - strings.Join(services, ", ")) - } - - // predicate to tell if a container we receive event for should be considered or ignored - ofInterest := func(c containerType.Summary) bool { - if len(services) > 0 { - // we only watch some services - return slices.Contains(services, c.Labels[api.ServiceLabel]) - } - return true - } - - // predicate to tell if a container we receive event for should be watched until termination - isRequired := func(c containerType.Summary) bool { - if len(services) > 0 && len(required) > 0 { - // we only watch some services - return slices.Contains(required, c.Labels[api.ServiceLabel]) - } - return true - } - - var ( - expected = utils.NewSet[string]() - watched = map[string]int{} - replaced []string - ) - for _, c := range containers { - if isRequired(c) { - expected.Add(c.ID) - } - watched[c.ID] = 0 - } - - ctx, stop := context.WithCancel(ctx) - err := s.Events(ctx, projectName, api.EventsOptions{ - Services: services, - Consumer: func(event api.Event) error { - defer func() { - // after consuming each event, check to see if we're done - if len(expected) == 0 { - stop() - } - }() - inspected, err := s.apiClient().ContainerInspect(ctx, event.Container) - if err != nil { - if cerrdefs.IsNotFound(err) { - // it's possible to get "destroy" or "kill" events but not - // be able to inspect in time before they're gone from the - // API, so just remove the watch without erroring - delete(watched, event.Container) - expected.Remove(event.Container) - return nil - } - return err - } - container := containerType.Summary{ - ID: inspected.ID, - Names: []string{inspected.Name}, - Labels: inspected.Config.Labels, - } - name := getContainerNameWithoutProject(container) - service := container.Labels[api.ServiceLabel] - switch event.Status { - case "stop": - if inspected.State.Running { - // on sync+restart action the container stops -> dies -> start -> restart - // we do not want to stop the current container, we want to restart it - return nil - } - if _, ok := watched[container.ID]; ok { - eType := api.ContainerEventStopped - if slices.Contains(replaced, container.ID) { - replaced = slices.DeleteFunc(replaced, func(e string) bool { return e == container.ID }) - eType = api.ContainerEventRecreated - } - listener(api.ContainerEvent{ - Type: eType, - Container: name, - ID: container.ID, - Service: service, - ExitCode: inspected.State.ExitCode, - }) - } - - delete(watched, container.ID) - expected.Remove(container.ID) - case "die": - restarted := watched[container.ID] - watched[container.ID] = restarted + 1 - // Container terminated. - willRestart := inspected.State.Restarting - if inspected.State.Running { - // on sync+restart action inspected.State.Restarting is false, - // however the container is already running before it restarts - willRestart = true - } - - eType := api.ContainerEventExit - if slices.Contains(replaced, container.ID) { - replaced = slices.DeleteFunc(replaced, func(e string) bool { return e == container.ID }) - eType = api.ContainerEventRecreated - } - - listener(api.ContainerEvent{ - Type: eType, - Container: name, - ID: container.ID, - Service: service, - ExitCode: inspected.State.ExitCode, - Restarting: willRestart, - }) - - if !willRestart { - // we're done with this one - delete(watched, container.ID) - expected.Remove(container.ID) - } - case "start": - count, ok := watched[container.ID] - mustAttach := ok && count > 0 // Container restarted, need to re-attach - if !ok { - // A new container has just been added to service by scale - watched[container.ID] = 0 - expected.Add(container.ID) - mustAttach = true - } - if mustAttach { - // Container restarted, need to re-attach - err := onStart(container, event.Timestamp) - if err != nil { - return err - } - } - case "create": - if id, ok := container.Labels[api.ContainerReplaceLabel]; ok { - replaced = append(replaced, id) - err = onRecreate(container, event.Timestamp) - if err != nil { - return err - } - if expected.Has(id) { - expected.Add(inspected.ID) - expected.Add(container.ID) - } - watched[container.ID] = 1 - } else if ofInterest(container) { - watched[container.ID] = 1 - if isRequired(container) { - expected.Add(container.ID) - } - } - } - return nil - }, - }) - if errors.Is(ctx.Err(), context.Canceled) { - return nil - } - return err -} diff --git a/pkg/compose/up.go b/pkg/compose/up.go index dda37f68e4d..85fbe1ef669 100644 --- a/pkg/compose/up.go +++ b/pkg/compose/up.go @@ -159,24 +159,6 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options } }) - var exitCode int - eg.Go(func() error { - code, err := printer.Run(options.Start.OnExit, options.Start.ExitCodeFrom, func() error { - _, _ = fmt.Fprintln(s.stdinfo(), "Aborting on container exit...") - eg.Go(func() error { - return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error { - return s.stop(ctx, project.Name, api.StopOptions{ - Services: options.Create.Services, - Project: project, - }, printer.HandleEvent) - }, s.stdinfo(), logConsumer) - }) - return nil - }) - exitCode = code - return err - }) - if options.Start.Watch && watcher != nil { err = watcher.Start(ctx) if err != nil { @@ -184,17 +166,75 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options } } + monitor := newMonitor(s.apiClient(), project) + monitor.withListener(printer.HandleEvent) + + var exitCode int + if options.Start.OnExit != api.CascadeIgnore { + once := true + // detect first container to exit to trigger application shutdown + monitor.withListener(func(event api.ContainerEvent) { + if once && event.Type == api.ContainerEventExited { + exitCode = event.ExitCode + printer.Stop() + _, _ = fmt.Fprintln(s.stdinfo(), "Aborting on container exit...") + eg.Go(func() error { + return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error { + return s.stop(ctx, project.Name, api.StopOptions{ + Services: options.Create.Services, + Project: project, + }, printer.HandleEvent) + }, s.stdinfo(), logConsumer) + }) + once = false + } + }) + } + + if options.Start.ExitCodeFrom != "" { + once := true + // capture exit code from first container to exit with selected service + monitor.withListener(func(event api.ContainerEvent) { + if once && event.Type == api.ContainerEventExited && event.Service == options.Start.ExitCodeFrom { + exitCode = event.ExitCode + once = false + } + }) + } + + monitor.withListener(func(event api.ContainerEvent) { + mustAttach := false + switch event.Type { + case api.ContainerEventCreated: + // A container has been added to the application (scale) + mustAttach = true + case api.ContainerEventStarted: + // A container is restarting - need to re-attach + mustAttach = event.Restarting + } + if mustAttach { + eg.Go(func() error { + // FIXME as container already started, we might miss the very first logs + return s.doAttachContainer(ctx, event.Service, event.ID, event.Source, printer.HandleEvent) + }) + } + }) + + eg.Go(func() error { + err := monitor.Start(ctx) + fmt.Println("monitor complete") + // Signal for the signal-handler goroutines to stop + close(doneCh) + printer.Stop() + return err + }) + // We use the parent context without cancellation as we manage sigterm to stop the stack err = s.start(context.WithoutCancel(ctx), project.Name, options.Start, printer.HandleEvent) if err != nil && !isTerminated.Load() { // Ignore error if the process is terminated return err } - // Signal for the signal-handler goroutines to stop - close(doneCh) - - printer.Stop() - err = eg.Wait().ErrorOrNil() if exitCode != 0 { errMsg := "" diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index 80ed42acd1c..f8edf3418ec 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -192,7 +192,6 @@ func (s *composeService) watch(ctx context.Context, project *types.Project, opti return nil, err } eg, ctx := errgroup.WithContext(ctx) - options.LogTo.Register(api.WatchLogger) var ( rules []watchRule diff --git a/pkg/compose/watch_test.go b/pkg/compose/watch_test.go index e7492f2bce7..c009fdfc04e 100644 --- a/pkg/compose/watch_test.go +++ b/pkg/compose/watch_test.go @@ -71,9 +71,6 @@ func (s stdLogger) Status(containerName, msg string) { fmt.Printf("%s: %s\n", containerName, msg) } -func (s stdLogger) Register(containerName string) { -} - func TestWatch_Sync(t *testing.T) { mockCtrl := gomock.NewController(t) cli := mocks.NewMockCli(mockCtrl) diff --git a/pkg/prompt/prompt_mock.go b/pkg/prompt/prompt_mock.go index 6b776741510..83b0ff1189b 100644 --- a/pkg/prompt/prompt_mock.go +++ b/pkg/prompt/prompt_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Container: github.com/docker/compose-cli/pkg/prompt (interfaces: UI) +// Source: github.com/docker/compose-cli/pkg/prompt (interfaces: UI) // Package prompt is a generated GoMock package. package prompt From d3551ad7cc6255b84e38e43d22fe96f2ce4ed841 Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Thu, 5 Jun 2025 15:45:51 +0200 Subject: [PATCH 2/2] use logs API with Since to collect the very first logs after restart Signed-off-by: Nicolas De Loof --- cmd/formatter/logs.go | 4 --- pkg/api/api.go | 2 +- pkg/compose/convergence.go | 9 ++++-- pkg/compose/logs.go | 13 +++++--- pkg/compose/monitor.go | 61 +++++++++++++++++++++---------------- pkg/compose/printer.go | 62 ++++++-------------------------------- pkg/compose/up.go | 49 +++++++++++++++++++----------- 7 files changed, 92 insertions(+), 108 deletions(-) diff --git a/cmd/formatter/logs.go b/cmd/formatter/logs.go index 2faa77b65e0..430cf1b0392 100644 --- a/cmd/formatter/logs.go +++ b/cmd/formatter/logs.go @@ -183,7 +183,3 @@ func (l logDecorator) Status(container, msg string) { l.decorated.Status(container, msg) l.After() } - -func (l logDecorator) Register(container string) { - l.decorated.Register(container) -} diff --git a/pkg/api/api.go b/pkg/api/api.go index 0bec86a6e13..b57a142a617 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -667,7 +667,7 @@ type ContainerEvent struct { ID string Service string Line string - // ContainerEventExited only + // ExitCode is only set on ContainerEventExited events ExitCode int Restarting bool } diff --git a/pkg/compose/convergence.go b/pkg/compose/convergence.go index a4b0d0e6ca7..c43f6cc76fb 100644 --- a/pkg/compose/convergence.go +++ b/pkg/compose/convergence.go @@ -635,13 +635,18 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P if inherit { inherited = &replaced } + + replacedContainerName := service.ContainerName + if replacedContainerName == "" { + replacedContainerName = service.Name + api.Separator + strconv.Itoa(number) + } name := getContainerName(project.Name, service, number) tmpName := fmt.Sprintf("%s_%s", replaced.ID[:12], name) opts := createOptions{ AutoRemove: false, AttachStdin: false, UseNetworkAliases: true, - Labels: mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replaced.ID), + Labels: mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replacedContainerName), } created, err = s.createMobyContainer(ctx, project, service, tmpName, number, inherited, opts, w) if err != nil { @@ -659,7 +664,7 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P return created, err } - err = s.apiClient().ContainerRename(ctx, created.ID, name) + err = s.apiClient().ContainerRename(ctx, tmpName, name) if err != nil { return created, err } diff --git a/pkg/compose/logs.go b/pkg/compose/logs.go index 2976dace4b2..b3b44d53e11 100644 --- a/pkg/compose/logs.go +++ b/pkg/compose/logs.go @@ -19,7 +19,6 @@ package compose import ( "context" "io" - "time" "github.com/containerd/errdefs" "github.com/docker/docker/api/types/container" @@ -73,9 +72,14 @@ func (s *composeService) Logs( if options.Follow { printer := newLogPrinter(consumer) - eg.Go(printer.Run) - monitor := newMonitor(s.apiClient(), options.Project) + monitor := newMonitor(s.apiClient(), projectName) + if len(options.Services) > 0 { + monitor.withServices(options.Services) + } else if options.Project != nil { + monitor.withServices(options.Project.ServiceNames()) + } + monitor.withListener(printer.HandleEvent) monitor.withListener(func(event api.ContainerEvent) { if event.Type == api.ContainerEventStarted { eg.Go(func() error { @@ -86,7 +90,7 @@ func (s *composeService) Logs( err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{ Follow: options.Follow, - Since: time.Unix(0, event.Time).Format(time.RFC3339Nano), + Since: ctr.State.StartedAt, Until: options.Until, Tail: options.Tail, Timestamps: options.Timestamps, @@ -100,7 +104,6 @@ func (s *composeService) Logs( } }) eg.Go(func() error { - defer printer.Stop() return monitor.Start(ctx) }) } diff --git a/pkg/compose/monitor.go b/pkg/compose/monitor.go index 624f01d5d47..b0f9cc0affd 100644 --- a/pkg/compose/monitor.go +++ b/pkg/compose/monitor.go @@ -20,7 +20,6 @@ import ( "context" "strconv" - "github.com/compose-spec/compose-go/v2/types" "github.com/containerd/errdefs" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/events" @@ -34,23 +33,23 @@ import ( type monitor struct { api client.APIClient - project *types.Project + project string // services tells us which service to consider and those we can ignore, maybe ran by a concurrent compose command services map[string]bool listeners []api.ContainerEventListener } -func newMonitor(api client.APIClient, project *types.Project) *monitor { - services := map[string]bool{} - if project != nil { - for name := range project.Services { - services[name] = true - } - } +func newMonitor(api client.APIClient, project string) *monitor { return &monitor{ api: api, project: project, - services: services, + services: map[string]bool{}, + } +} + +func (c *monitor) withServices(services []string) { + for _, name := range services { + c.services[name] = true } } @@ -62,7 +61,7 @@ func (c *monitor) Start(ctx context.Context) error { initialState, err := c.api.ContainerList(ctx, container.ListOptions{ All: true, Filters: filters.NewArgs( - projectFilter(c.project.Name), + projectFilter(c.project), oneOffFilter(false), hasConfigHashLabel(), ), @@ -78,22 +77,24 @@ func (c *monitor) Start(ctx context.Context) error { containers.Add(ctr.ID) } } - restarting := utils.Set[string]{} evtCh, errCh := c.api.Events(ctx, events.ListOptions{ Filters: filters.NewArgs( filters.Arg("type", "container"), - projectFilter(c.project.Name)), + projectFilter(c.project)), }) for { + if len(containers) == 0 { + return nil + } select { case <-ctx.Done(): return nil case err := <-errCh: return err case event := <-evtCh: - if !c.services[event.Actor.Attributes[api.ServiceLabel]] { + if len(c.services) > 0 && !c.services[event.Actor.Attributes[api.ServiceLabel]] { continue } ctr, err := c.getContainerSummary(event) @@ -103,24 +104,35 @@ func (c *monitor) Start(ctx context.Context) error { switch event.Action { case events.ActionCreate: - containers.Add(ctr.ID) + if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] { + containers.Add(ctr.ID) + } + evtType := api.ContainerEventCreated + if _, ok := ctr.Labels[api.ContainerReplaceLabel]; ok { + evtType = api.ContainerEventRecreated + } for _, listener := range c.listeners { - listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventCreated)) + listener(newContainerEvent(event.TimeNano, ctr, evtType)) } logrus.Debugf("container %s created", ctr.Name) case events.ActionStart: restarted := restarting.Has(ctr.ID) - for _, listener := range c.listeners { - listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) { - e.Restarting = restarted - })) - } if restarted { logrus.Debugf("container %s restarted", ctr.Name) + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) { + e.Restarting = restarted + })) + } } else { logrus.Debugf("container %s started", ctr.Name) + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted)) + } + } + if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] { + containers.Add(ctr.ID) } - containers.Add(ctr.ID) case events.ActionRestart: for _, listener := range c.listeners { listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventRestarted)) @@ -159,9 +171,6 @@ func (c *monitor) Start(ctx context.Context) error { } } } - if len(containers) == 0 { - return nil - } } } @@ -192,7 +201,7 @@ func (c *monitor) getContainerSummary(event events.Message) (*api.ContainerSumma ctr := &api.ContainerSummary{ ID: event.Actor.ID, Name: event.Actor.Attributes["name"], - Project: c.project.Name, + Project: c.project, Service: event.Actor.Attributes[api.ServiceLabel], Labels: event.Actor.Attributes, // More than just labels, but that'c the closest the API gives us } diff --git a/pkg/compose/printer.go b/pkg/compose/printer.go index 55084270ab5..079736869f5 100644 --- a/pkg/compose/printer.go +++ b/pkg/compose/printer.go @@ -18,7 +18,6 @@ package compose import ( "fmt" - "sync" "github.com/docker/compose/v2/pkg/api" ) @@ -26,72 +25,29 @@ import ( // logPrinter watch application containers and collect their logs type logPrinter interface { HandleEvent(event api.ContainerEvent) - Run() error - Stop() } type printer struct { - queue chan api.ContainerEvent consumer api.LogConsumer - stopCh chan struct{} // stopCh is a signal channel for producers to stop sending events to the queue - stop sync.Once } // newLogPrinter builds a LogPrinter passing containers logs to LogConsumer func newLogPrinter(consumer api.LogConsumer) logPrinter { printer := printer{ consumer: consumer, - queue: make(chan api.ContainerEvent), - stopCh: make(chan struct{}), - stop: sync.Once{}, } return &printer } -func (p *printer) Stop() { - p.stop.Do(func() { - close(p.stopCh) - for { - select { - case <-p.queue: - // purge the queue to free producers goroutines - // p.queue will be garbage collected - default: - return - } - } - }) -} - func (p *printer) HandleEvent(event api.ContainerEvent) { - select { - case <-p.stopCh: - return - default: - p.queue <- event - } -} - -func (p *printer) Run() error { - defer p.Stop() - - // containers we are tracking. Use true when container is running, false after we receive a stop|die signal - for { - select { - case <-p.stopCh: - return nil - case event := <-p.queue: - switch event.Type { - case api.ContainerEventExited, api.ContainerEventStopped, api.ContainerEventRecreated, api.ContainerEventRestarted: - p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode)) - if event.Type == api.ContainerEventRecreated { - p.consumer.Status(event.Source, "has been recreated") - } - case api.ContainerEventLog, api.HookEventLog: - p.consumer.Log(event.Source, event.Line) - case api.ContainerEventErr: - p.consumer.Err(event.Source, event.Line) - } - } + switch event.Type { + case api.ContainerEventExited: + p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode)) + case api.ContainerEventRecreated: + p.consumer.Status(event.Container.Labels[api.ContainerReplaceLabel], "has been recreated") + case api.ContainerEventLog, api.HookEventLog: + p.consumer.Log(event.Source, event.Line) + case api.ContainerEventErr: + p.consumer.Err(event.Source, event.Line) } } diff --git a/pkg/compose/up.go b/pkg/compose/up.go index 85fbe1ef669..04cf06e9972 100644 --- a/pkg/compose/up.go +++ b/pkg/compose/up.go @@ -18,6 +18,7 @@ package compose import ( "context" + "errors" "fmt" "os" "os/signal" @@ -31,6 +32,7 @@ import ( "github.com/docker/compose/v2/internal/tracing" "github.com/docker/compose/v2/pkg/api" "github.com/docker/compose/v2/pkg/progress" + "github.com/docker/docker/errdefs" "github.com/eiannone/keyboard" "github.com/hashicorp/go-multierror" "github.com/sirupsen/logrus" @@ -166,7 +168,12 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options } } - monitor := newMonitor(s.apiClient(), project) + monitor := newMonitor(s.apiClient(), project.Name) + if len(options.Start.Services) > 0 { + monitor.withServices(options.Start.Services) + } else { + monitor.withServices(project.ServiceNames()) + } monitor.withListener(printer.HandleEvent) var exitCode int @@ -175,9 +182,12 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options // detect first container to exit to trigger application shutdown monitor.withListener(func(event api.ContainerEvent) { if once && event.Type == api.ContainerEventExited { + if options.Start.OnExit == api.CascadeFail && event.ExitCode == 0 { + return + } + once = false exitCode = event.ExitCode - printer.Stop() - _, _ = fmt.Fprintln(s.stdinfo(), "Aborting on container exit...") + _, _ = fmt.Fprintln(s.stdinfo(), progress.ErrorColor("Aborting on container exit...")) eg.Go(func() error { return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error { return s.stop(ctx, project.Name, api.StopOptions{ @@ -186,7 +196,6 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options }, printer.HandleEvent) }, s.stdinfo(), logConsumer) }) - once = false } }) } @@ -203,29 +212,35 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options } monitor.withListener(func(event api.ContainerEvent) { - mustAttach := false - switch event.Type { - case api.ContainerEventCreated: - // A container has been added to the application (scale) - mustAttach = true - case api.ContainerEventStarted: - // A container is restarting - need to re-attach - mustAttach = event.Restarting + if event.Type != api.ContainerEventStarted { + return } - if mustAttach { + if event.Restarting || event.Container.Labels[api.ContainerReplaceLabel] != "" { eg.Go(func() error { - // FIXME as container already started, we might miss the very first logs - return s.doAttachContainer(ctx, event.Service, event.ID, event.Source, printer.HandleEvent) + ctr, err := s.apiClient().ContainerInspect(ctx, event.ID) + if err != nil { + return err + } + + err = s.doLogContainer(ctx, options.Start.Attach, event.Source, ctr, api.LogOptions{ + Follow: true, + Since: ctr.State.StartedAt, + }) + var notImplErr errdefs.ErrNotImplemented + if errors.As(err, ¬ImplErr) { + // container may be configured with logging_driver: none + // as container already started, we might miss the very first logs. But still better than none + return s.doAttachContainer(ctx, event.Service, event.ID, event.Source, printer.HandleEvent) + } + return err }) } }) eg.Go(func() error { err := monitor.Start(ctx) - fmt.Println("monitor complete") // Signal for the signal-handler goroutines to stop close(doneCh) - printer.Stop() return err })