diff --git a/CHANGELOG.md b/CHANGELOG.md index e197dcc4fe..de4ab78c04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ All notable changes to `src-cli` are documented in this file. ## Unreleased +### Added + +- Batch Changes: Added `-fail-fast` flag to `src batch preview` and `src batch apply` that causes execution to immediately halt on the first error instead of continuing with other repositories. This enables faster iteration on batch specs. [#1154](https://github.com/sourcegraph/src-cli/pull/1154) + ## 6.1.0 - Support uploading GZIP compressed SCIP indexes [1146](https://github.com/sourcegraph/src-cli/pull/1146) - Remove deprecated `lsif` subcommand, and remove LSIF->SCIP conversion support [1147](https://github.com/sourcegraph/src-cli/pull/1147) diff --git a/cmd/src/batch_common.go b/cmd/src/batch_common.go index 8171156675..157af55927 100644 --- a/cmd/src/batch_common.go +++ b/cmd/src/batch_common.go @@ -98,6 +98,9 @@ type batchExecuteFlags struct { skipErrors bool runAsRoot bool + // If true, fail fast on first error instead of continuing execution + failFast bool + // EXPERIMENTAL textOnly bool } @@ -164,6 +167,11 @@ func newBatchExecuteFlags(flagSet *flag.FlagSet, cacheDir, tempDir string) *batc "If true, forces all step containers to run as root.", ) + flagSet.BoolVar( + &caf.failFast, "fail-fast", false, + "Halts execution immediately upon first error instead of continuing with other tasks.", + ) + return caf } @@ -430,6 +438,7 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error TempDir: opts.flags.tempDir, GlobalEnv: os.Environ(), ForceRoot: opts.flags.runAsRoot, + FailFast: opts.flags.failFast, BinaryDiffs: ffs.BinaryDiffs, }, Logger: logManager, diff --git a/internal/batches/executor/coordinator.go b/internal/batches/executor/coordinator.go index 15855f5d37..746ea86d8e 100644 --- a/internal/batches/executor/coordinator.go +++ b/internal/batches/executor/coordinator.go @@ -14,7 +14,7 @@ import ( type taskExecutor interface { Start(context.Context, []*Task, TaskExecutionUI) - Wait(context.Context) ([]taskResult, error) + Wait() ([]taskResult, error) } // Coordinator coordinates the execution of Tasks. It makes use of an executor, @@ -182,7 +182,7 @@ func (c *Coordinator) ExecuteAndBuildSpecs(ctx context.Context, batchSpec *batch // Run executor. c.exec.Start(ctx, tasks, ui) - results, errs := c.exec.Wait(ctx) + results, errs := c.exec.Wait() // Write all step cache results to the cache. for _, res := range results { diff --git a/internal/batches/executor/coordinator_test.go b/internal/batches/executor/coordinator_test.go index 9ce252cda3..53e17c0fab 100644 --- a/internal/batches/executor/coordinator_test.go +++ b/internal/batches/executor/coordinator_test.go @@ -481,7 +481,7 @@ func assertCacheSize(t *testing.T, cache *inMemoryExecutionCache, want int) { // expectCachedResultForStep returns a function that can be used as a // startCallback on dummyExecutor to assert that the first Task has no cached results. func assertNoCachedResult(t *testing.T) func(context.Context, []*Task, TaskExecutionUI) { - return func(c context.Context, tasks []*Task, ui TaskExecutionUI) { + return func(_ context.Context, tasks []*Task, ui TaskExecutionUI) { t.Helper() task := tasks[0] @@ -562,7 +562,7 @@ func (d *dummyExecutor) Start(ctx context.Context, ts []*Task, ui TaskExecutionU // "noop noop noop", the crowd screams } -func (d *dummyExecutor) Wait(context.Context) ([]taskResult, error) { +func (d *dummyExecutor) Wait() ([]taskResult, error) { return d.results, d.waitErr } diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index 9ccb94a8fc..45fb7307c2 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -3,10 +3,9 @@ package executor import ( "context" "fmt" - "sync" "time" - "github.com/neelance/parallel" + "github.com/sourcegraph/conc/pool" "github.com/sourcegraph/sourcegraph/lib/errors" @@ -69,6 +68,7 @@ type NewExecutorOpts struct { IsRemote bool GlobalEnv []string ForceRoot bool + FailFast bool BinaryDiffs bool } @@ -76,19 +76,14 @@ type NewExecutorOpts struct { type executor struct { opts NewExecutorOpts - par *parallel.Run + workPool *pool.ResultContextPool[*taskResult] doneEnqueuing chan struct{} - - results []taskResult - resultsMu sync.Mutex } func NewExecutor(opts NewExecutorOpts) *executor { return &executor{ - opts: opts, - + opts: opts, doneEnqueuing: make(chan struct{}), - par: parallel.NewRun(opts.Parallelism), } } @@ -97,6 +92,11 @@ func NewExecutor(opts NewExecutorOpts) *executor { func (x *executor) Start(ctx context.Context, tasks []*Task, ui TaskExecutionUI) { defer func() { close(x.doneEnqueuing) }() + x.workPool = pool.NewWithResults[*taskResult]().WithMaxGoroutines(x.opts.Parallelism).WithContext(ctx) + if x.opts.FailFast { + x.workPool = x.workPool.WithCancelOnError() + } + for _, task := range tasks { select { case <-ctx.Done(): @@ -104,48 +104,33 @@ func (x *executor) Start(ctx context.Context, tasks []*Task, ui TaskExecutionUI) default: } - x.par.Acquire() - - go func(task *Task, ui TaskExecutionUI) { - defer x.par.Release() - - select { - case <-ctx.Done(): - return - default: - err := x.do(ctx, task, ui) - if err != nil { - x.par.Error(err) - } - } - }(task, ui) + x.workPool.Go(func(c context.Context) (*taskResult, error) { + return x.do(c, task, ui) + }) } } // Wait blocks until all Tasks enqueued with Start have been executed. -func (x *executor) Wait(ctx context.Context) ([]taskResult, error) { +func (x *executor) Wait() ([]taskResult, error) { <-x.doneEnqueuing - result := make(chan error, 1) - - go func(ch chan error) { - ch <- x.par.Wait() - }(result) - - select { - case <-ctx.Done(): - return x.results, ctx.Err() - case err := <-result: - close(result) - if err != nil { - return x.results, err + r, err := x.workPool.Wait() + results := make([]taskResult, len(r)) + for i, r := range r { + if r == nil { + results[i] = taskResult{ + task: nil, + stepResults: nil, + err: err, + } + } else { + results[i] = *r } } - - return x.results, nil + return results, err } -func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err error) { +func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (result *taskResult, err error) { // Ensure that the status is updated when we're done. defer func() { ui.TaskFinished(task, err) @@ -157,7 +142,7 @@ func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err // Let's set up our logging. l, err := x.opts.Logger.AddTask(util.SlugForPathInRepo(task.Repository.Name, task.Repository.Rev(), task.Path)) if err != nil { - return errors.Wrap(err, "creating log file") + return nil, errors.Wrap(err, "creating log file") } defer l.Close() @@ -196,18 +181,10 @@ func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err } l.MarkErrored() } - x.addResult(task, stepResults, err) - - return err -} - -func (x *executor) addResult(task *Task, stepResults []execution.AfterStepResult, err error) { - x.resultsMu.Lock() - defer x.resultsMu.Unlock() - x.results = append(x.results, taskResult{ + return &taskResult{ task: task, stepResults: stepResults, err: err, - }) + }, err } diff --git a/internal/batches/executor/executor_test.go b/internal/batches/executor/executor_test.go index c4accc0f87..2df8c21ea0 100644 --- a/internal/batches/executor/executor_test.go +++ b/internal/batches/executor/executor_test.go @@ -78,6 +78,8 @@ func TestExecutor_Integration(t *testing.T) { wantFinishedWithErr int wantCacheCount int + + failFast bool }{ { name: "success", @@ -316,6 +318,39 @@ func TestExecutor_Integration(t *testing.T) { wantFinishedWithErr: 1, wantCacheCount: 2, }, + { + name: "fail fast mode", + archives: []mock.RepoArchive{ + {RepoName: testRepo1.Name, Commit: testRepo1.Rev(), Files: map[string]string{ + "README.md": "# Welcome to the README\n", + }}, + {RepoName: testRepo2.Name, Commit: testRepo2.Rev(), Files: map[string]string{ + "README.md": "# Sourcegraph README\n", + }}, + }, + steps: []batcheslib.Step{ + { + Run: `exit 1`, + // We must fail for the first repository, so that the other repo's work is cancelled in fail fast mode. + If: fmt.Sprintf(`${{ eq repository.name %q }}`, testRepo1.Name), + }, + { + // We introduce an artificial way for the second repository, so that it can't complete before the failure of the first one. + Run: `sleep 0.1`, + If: fmt.Sprintf(`${{ eq repository.name %q }}`, testRepo2.Name), + }, + {Run: `echo -e "foobar\n" >> README.md`}, + }, + tasks: []*Task{ + {Repository: testRepo1}, + {Repository: testRepo2}, + }, + wantErrInclude: "execution in github.com/sourcegraph/src-cli failed: run: exit 1", + // In fail fast mode, we expect that other steps are cancelled. + wantFinished: 0, + wantFinishedWithErr: 2, + failFast: true, + }, { name: "mount path", archives: []mock.RepoArchive{ @@ -376,8 +411,13 @@ func TestExecutor_Integration(t *testing.T) { // Temp dir for log files and downloaded archives testTempDir := t.TempDir() - cr, _ := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, images) + ctx := context.Background() + cr, _ := workspace.NewCreator(ctx, "bind", testTempDir, testTempDir, images) // Setup executor + parallelism := 0 + if tc.failFast { + parallelism = 1 + } opts := NewExecutorOpts{ Creator: cr, RepoArchiveRegistry: repozip.NewArchiveRegistry(client, testTempDir, false), @@ -385,8 +425,9 @@ func TestExecutor_Integration(t *testing.T) { EnsureImage: imageMapEnsurer(images), TempDir: testTempDir, - Parallelism: runtime.GOMAXPROCS(0), + Parallelism: runtime.GOMAXPROCS(parallelism), Timeout: tc.executorTimeout, + FailFast: tc.failFast, } if opts.Timeout == 0 { @@ -397,9 +438,9 @@ func TestExecutor_Integration(t *testing.T) { executor := NewExecutor(opts) // Run executor - executor.Start(context.Background(), tc.tasks, dummyUI) + executor.Start(ctx, tc.tasks, dummyUI) - results, err := executor.Wait(context.Background()) + results, err := executor.Wait() if tc.wantErrInclude == "" { if err != nil { t.Fatalf("execution failed: %s", err) @@ -490,10 +531,10 @@ func TestExecutor_Integration(t *testing.T) { // Make sure that all the Tasks have been updated correctly if have, want := len(dummyUI.finished), tc.wantFinished; have != want { - t.Fatalf("wrong number of finished tasks. want=%d, have=%d", want, have) + t.Fatalf("wrong number of UI finished tasks. want=%d, have=%d", want, have) } if have, want := len(dummyUI.finishedWithErr), tc.wantFinishedWithErr; have != want { - t.Fatalf("wrong number of finished-with-err tasks. want=%d, have=%d", want, have) + t.Fatalf("wrong number of UI finished-with-err tasks. want=%d, have=%d", want, have) } }) } @@ -797,7 +838,8 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive) } } - cr, _ := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, images) + ctx := context.Background() + cr, _ := workspace.NewCreator(ctx, "bind", testTempDir, testTempDir, images) // Setup executor executor := NewExecutor(NewExecutorOpts{ Creator: cr, @@ -810,8 +852,8 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive) Timeout: 30 * time.Second, }) - executor.Start(context.Background(), tasks, newDummyTaskExecutionUI()) - return executor.Wait(context.Background()) + executor.Start(ctx, tasks, newDummyTaskExecutionUI()) + return executor.Wait() } func imageMapEnsurer(m map[string]docker.Image) imageEnsurer {