From da8e8d13caaa2f39fafeef88bb02344821a3af12 Mon Sep 17 00:00:00 2001 From: Michael Bahr Date: Wed, 19 Mar 2025 15:53:15 +0100 Subject: [PATCH 1/8] wip --- CHANGELOG.md | 4 + cmd/src/batch_common.go | 9 ++ internal/batches/executor/executor.go | 1 + internal/batches/executor/executor_test.go | 171 +++++++++++++++++++++ 4 files changed, 185 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e197dcc4fe..80c1c9615f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ All notable changes to `src-cli` are documented in this file. ## Unreleased +### Added + +- Batch Changes: Added `-fail-fast` flag to `src batch preview` and `src batch apply` that causes execution to immediately halt on the first error instead of continuing with other repositories. This enables faster iteration on batch specs. [#TBD](https://github.com/sourcegraph/src-cli/pull/TBD) + ## 6.1.0 - Support uploading GZIP compressed SCIP indexes [1146](https://github.com/sourcegraph/src-cli/pull/1146) - Remove deprecated `lsif` subcommand, and remove LSIF->SCIP conversion support [1147](https://github.com/sourcegraph/src-cli/pull/1147) diff --git a/cmd/src/batch_common.go b/cmd/src/batch_common.go index 8171156675..c49952d7d7 100644 --- a/cmd/src/batch_common.go +++ b/cmd/src/batch_common.go @@ -98,6 +98,9 @@ type batchExecuteFlags struct { skipErrors bool runAsRoot bool + // If true, fail fast on first error instead of continuing execution + failFast bool + // EXPERIMENTAL textOnly bool } @@ -164,6 +167,11 @@ func newBatchExecuteFlags(flagSet *flag.FlagSet, cacheDir, tempDir string) *batc "If true, forces all step containers to run as root.", ) + flagSet.BoolVar( + &caf.failFast, "fail-fast", false, + "If true, fails fast and halts execution immediately upon first error instead of continuing with other repositories.", + ) + return caf } @@ -430,6 +438,7 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error TempDir: opts.flags.tempDir, GlobalEnv: os.Environ(), ForceRoot: opts.flags.runAsRoot, + FailFast: opts.flags.failFast, BinaryDiffs: ffs.BinaryDiffs, }, Logger: logManager, diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index 9ccb94a8fc..fc30236abe 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -69,6 +69,7 @@ type NewExecutorOpts struct { IsRemote bool GlobalEnv []string ForceRoot bool + FailFast bool BinaryDiffs bool } diff --git a/internal/batches/executor/executor_test.go b/internal/batches/executor/executor_test.go index c4accc0f87..3fa748cf1f 100644 --- a/internal/batches/executor/executor_test.go +++ b/internal/batches/executor/executor_test.go @@ -10,6 +10,7 @@ import ( "path/filepath" "runtime" "strings" + "sync" "testing" "time" @@ -770,6 +771,122 @@ index 3040106..5f2f924 100644 }) } +func TestExecutor_FailFast(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Test doesn't work on Windows because dummydocker is written in bash") + } + + addToPath(t, "testdata/dummydocker") + + defaultBatchChangeAttributes := &template.BatchChangeAttributes{ + Name: "fail-fast-test-batch-change", + Description: "this tests fail-fast functionality", + } + + archives := []mock.RepoArchive{ + {RepoName: testRepo1.Name, Commit: testRepo1.Rev(), Files: map[string]string{ + "README.md": "# Welcome to the README\n", + }}, + {RepoName: testRepo2.Name, Commit: testRepo2.Rev(), Files: map[string]string{ + "README.md": "# Sourcegraph README\n", + }}, + } + + steps := []batcheslib.Step{ + {Run: `echo "success" >> success.txt`}, + {Run: `exit 1`}, // This step will fail + } + + tasks := []*Task{ + {Repository: testRepo1, BatchChangeAttributes: defaultBatchChangeAttributes, Steps: steps}, + {Repository: testRepo2, BatchChangeAttributes: defaultBatchChangeAttributes, Steps: steps}, + } + + // Setup a mock test server + mux := mock.NewZipArchivesMux(t, nil, archives...) + ts := httptest.NewServer(mux) + defer ts.Close() + + // Setup an api.Client that points to this test server + var clientBuffer bytes.Buffer + client := api.NewClient(api.ClientOpts{Endpoint: ts.URL, Out: &clientBuffer}) + + // Temp dir for log files and downloaded archives + testTempDir := t.TempDir() + + // Create test images + images := make(map[string]docker.Image) + for _, step := range steps { + images[step.Container] = &mock.Image{RawDigest: step.Container} + } + + cr, _ := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, images) + + t.Run("without fail-fast", func(t *testing.T) { + // Setup executor without fail-fast + opts := NewExecutorOpts{ + Creator: cr, + RepoArchiveRegistry: repozip.NewArchiveRegistry(client, testTempDir, false), + Logger: mock.LogNoOpManager{}, + EnsureImage: imageMapEnsurer(images), + TempDir: testTempDir, + Parallelism: 1, // Use sequential execution to ensure predictable order + Timeout: 30 * time.Second, + FailFast: false, // Explicitly set to false + } + + dummyUI := newDummyTaskExecutionUI() + executor := NewExecutor(opts) + + // Run executor + ctx := context.Background() + executor.Start(ctx, tasks, dummyUI) + results, err := executor.Wait(ctx) + + // Without fail-fast, we should have errors but still process all tasks + require.Error(t, err, "execution should have failed") + require.Len(t, results, 2, "should have results for both tasks") + require.Len(t, dummyUI.finishedWithErr, 2, "both tasks should have finished with error") + }) + + t.Run("with fail-fast", func(t *testing.T) { + // Setup executor with fail-fast + opts := NewExecutorOpts{ + Creator: cr, + RepoArchiveRegistry: repozip.NewArchiveRegistry(client, testTempDir, false), + Logger: mock.LogNoOpManager{}, + EnsureImage: imageMapEnsurer(images), + TempDir: testTempDir, + Parallelism: 1, // Use sequential execution to ensure predictable order + Timeout: 30 * time.Second, + FailFast: true, // Enable fail-fast + } + + dummyUI := newDummyTaskExecutionUI2() + executor := NewExecutor(opts) + + // Run executor + ctx := context.Background() + executor.Start(ctx, tasks, dummyUI) + results, err := executor.Wait(ctx) + + // With fail-fast, we should still have error but only process the first task + require.Error(t, err, "execution should have failed") + + // Check that we have at least one result (from the first task) + require.NotEmpty(t, results, "should have at least one result") + + // The key check: we should have exactly one task that finished with error + // This verifies that execution stopped after the first error + require.Equal(t, 1, len(dummyUI.finishedWithErr), "only the first task should have finished with error") + + // Additionally, verify that the second task was never started or at least not finished + // This is a more precise check for the fail-fast behavior + _, secondTaskFinished := dummyUI.finishedWithErr[tasks[1]] + require.False(t, secondTaskFinished, "second task should not have finished with error due to fail-fast") + }) +} + func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive) ([]taskResult, error) { if runtime.GOOS == "windows" { t.Skip("Test doesn't work on Windows because dummydocker is written in bash") @@ -822,3 +939,57 @@ func imageMapEnsurer(m map[string]docker.Image) imageEnsurer { return nil, errors.New(fmt.Sprintf("image for %s not found", container)) } } + +var _ TaskExecutionUI = &dummyTaskExecutionUI2{} + +type dummyTaskExecutionUI2 struct { + mu sync.Mutex + + started map[*Task]struct{} + finished map[*Task]struct{} + finishedWithErr map[*Task]struct{} + specs map[*Task][]*batcheslib.ChangesetSpec +} + +func newDummyTaskExecutionUI2() *dummyTaskExecutionUI2 { + return &dummyTaskExecutionUI2{ + started: map[*Task]struct{}{}, + finished: map[*Task]struct{}{}, + finishedWithErr: map[*Task]struct{}{}, + specs: map[*Task][]*batcheslib.ChangesetSpec{}, + } +} + +func (d *dummyTaskExecutionUI2) Start([]*Task) {} +func (d *dummyTaskExecutionUI2) Success() {} +func (d *dummyTaskExecutionUI2) Failed(err error) {} + +func (d *dummyTaskExecutionUI2) TaskStarted(t *Task) { + d.mu.Lock() + defer d.mu.Unlock() + + d.started[t] = struct{}{} +} + +func (d *dummyTaskExecutionUI2) TaskFinished(t *Task, err error) { + d.mu.Lock() + defer d.mu.Unlock() + + delete(d.started, t) + if err != nil { + d.finishedWithErr[t] = struct{}{} + } else { + d.finished[t] = struct{}{} + } +} + +func (d *dummyTaskExecutionUI2) TaskChangesetSpecsBuilt(t *Task, specs []*batcheslib.ChangesetSpec) { + d.mu.Lock() + defer d.mu.Unlock() + + d.specs[t] = specs +} + +func (d *dummyTaskExecutionUI2) StepsExecutionUI(t *Task) StepsExecutionUI { + return NoopStepsExecUI{} +} From c58470eb9c2f9ed4de97cc0f9783b1d96692053f Mon Sep 17 00:00:00 2001 From: Michael Bahr Date: Wed, 19 Mar 2025 17:21:41 +0100 Subject: [PATCH 2/8] wip --- cmd/src/batch_common.go | 1 + internal/batches/executor/coordinator.go | 8 +- internal/batches/executor/coordinator_test.go | 16 +- internal/batches/executor/executor.go | 84 +++---- internal/batches/executor/executor_test.go | 235 ++++-------------- 5 files changed, 100 insertions(+), 244 deletions(-) diff --git a/cmd/src/batch_common.go b/cmd/src/batch_common.go index c49952d7d7..930dc2b5f1 100644 --- a/cmd/src/batch_common.go +++ b/cmd/src/batch_common.go @@ -428,6 +428,7 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error coord := executor.NewCoordinator( executor.NewCoordinatorOpts{ ExecOpts: executor.NewExecutorOpts{ + Context: ctx, Logger: logManager, RepoArchiveRegistry: archiveRegistry, Creator: workspaceCreator, diff --git a/internal/batches/executor/coordinator.go b/internal/batches/executor/coordinator.go index 15855f5d37..dcab600296 100644 --- a/internal/batches/executor/coordinator.go +++ b/internal/batches/executor/coordinator.go @@ -13,8 +13,8 @@ import ( ) type taskExecutor interface { - Start(context.Context, []*Task, TaskExecutionUI) - Wait(context.Context) ([]taskResult, error) + Start([]*Task, TaskExecutionUI) + Wait() ([]taskResult, error) } // Coordinator coordinates the execution of Tasks. It makes use of an executor, @@ -181,8 +181,8 @@ func (c *Coordinator) ExecuteAndBuildSpecs(ctx context.Context, batchSpec *batch ui.Start(tasks) // Run executor. - c.exec.Start(ctx, tasks, ui) - results, errs := c.exec.Wait(ctx) + c.exec.Start(tasks, ui) + results, errs := c.exec.Wait() // Write all step cache results to the cache. for _, res := range results { diff --git a/internal/batches/executor/coordinator_test.go b/internal/batches/executor/coordinator_test.go index 9ce252cda3..e5b8a6dfe7 100644 --- a/internal/batches/executor/coordinator_test.go +++ b/internal/batches/executor/coordinator_test.go @@ -402,7 +402,7 @@ func TestCoordinator_Execute_StepCaching(t *testing.T) { task.Steps[1].Run = `echo "two modified"` // Re-execution should start with the diff produced by steps[0] as the // start state from which steps[1] is then re-executed. - execAndEnsure(t, coord, executor, batchSpec, task, func(ctx context.Context, t []*Task, teu TaskExecutionUI) {}) + execAndEnsure(t, coord, executor, batchSpec, task, func(t []*Task, teu TaskExecutionUI) {}) // Cache now contains old entries, plus another "complete task" entry and // two entries for newly executed steps. assertCacheSize(t, cache, 5) @@ -413,7 +413,7 @@ func TestCoordinator_Execute_StepCaching(t *testing.T) { // Change the 3rd step's definition: task.Steps[2].Run = `echo "three modified"` // Re-execution should use the diff from steps[1] as start state - execAndEnsure(t, coord, executor, batchSpec, task, func(ctx context.Context, t []*Task, teu TaskExecutionUI) {}) + execAndEnsure(t, coord, executor, batchSpec, task, func(t []*Task, teu TaskExecutionUI) {}) // Cache now contains old entries, plus another "complete task" entry and // a single new step entry assertCacheSize(t, cache, 6) @@ -480,8 +480,8 @@ func assertCacheSize(t *testing.T, cache *inMemoryExecutionCache, want int) { // expectCachedResultForStep returns a function that can be used as a // startCallback on dummyExecutor to assert that the first Task has no cached results. -func assertNoCachedResult(t *testing.T) func(context.Context, []*Task, TaskExecutionUI) { - return func(c context.Context, tasks []*Task, ui TaskExecutionUI) { +func assertNoCachedResult(t *testing.T) func([]*Task, TaskExecutionUI) { + return func(tasks []*Task, ui TaskExecutionUI) { t.Helper() task := tasks[0] @@ -491,7 +491,7 @@ func assertNoCachedResult(t *testing.T) func(context.Context, []*Task, TaskExecu } } -type startCallback func(context.Context, []*Task, TaskExecutionUI) +type startCallback func([]*Task, TaskExecutionUI) var _ TaskExecutionUI = &dummyTaskExecutionUI{} @@ -554,15 +554,15 @@ type dummyExecutor struct { waitErr error } -func (d *dummyExecutor) Start(ctx context.Context, ts []*Task, ui TaskExecutionUI) { +func (d *dummyExecutor) Start(ts []*Task, ui TaskExecutionUI) { if d.startCb != nil { - d.startCb(ctx, ts, ui) + d.startCb(ts, ui) d.startCbCalled = true } // "noop noop noop", the crowd screams } -func (d *dummyExecutor) Wait(context.Context) ([]taskResult, error) { +func (d *dummyExecutor) Wait() ([]taskResult, error) { return d.results, d.waitErr } diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index fc30236abe..499566e2cb 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -3,11 +3,9 @@ package executor import ( "context" "fmt" - "sync" + "github.com/sourcegraph/conc/pool" "time" - "github.com/neelance/parallel" - "github.com/sourcegraph/sourcegraph/lib/errors" "github.com/sourcegraph/src-cli/internal/batches/docker" @@ -72,81 +70,71 @@ type NewExecutorOpts struct { FailFast bool BinaryDiffs bool + Context context.Context } type executor struct { opts NewExecutorOpts - par *parallel.Run + workPool *pool.ResultContextPool[*taskResult] doneEnqueuing chan struct{} - results []taskResult - resultsMu sync.Mutex + results []taskResult } func NewExecutor(opts NewExecutorOpts) *executor { + p := pool.NewWithResults[*taskResult]().WithMaxGoroutines(opts.Parallelism).WithContext(opts.Context) + if opts.FailFast { + p = p.WithCancelOnError() + } return &executor{ opts: opts, doneEnqueuing: make(chan struct{}), - par: parallel.NewRun(opts.Parallelism), + workPool: p, } } // Start starts the execution of the given Tasks in goroutines, calling the // given taskStatusHandler to update the progress of the tasks. -func (x *executor) Start(ctx context.Context, tasks []*Task, ui TaskExecutionUI) { +func (x *executor) Start(tasks []*Task, ui TaskExecutionUI) { defer func() { close(x.doneEnqueuing) }() for _, task := range tasks { select { - case <-ctx.Done(): + case <-x.opts.Context.Done(): return default: } - x.par.Acquire() - - go func(task *Task, ui TaskExecutionUI) { - defer x.par.Release() - - select { - case <-ctx.Done(): - return - default: - err := x.do(ctx, task, ui) - if err != nil { - x.par.Error(err) - } - } - }(task, ui) + t := task + x.workPool.Go(func(c context.Context) (*taskResult, error) { + return x.do(c, t, ui) + }) } } // Wait blocks until all Tasks enqueued with Start have been executed. -func (x *executor) Wait(ctx context.Context) ([]taskResult, error) { +func (x *executor) Wait() ([]taskResult, error) { <-x.doneEnqueuing - result := make(chan error, 1) - - go func(ch chan error) { - ch <- x.par.Wait() - }(result) - - select { - case <-ctx.Done(): - return x.results, ctx.Err() - case err := <-result: - close(result) - if err != nil { - return x.results, err + r, err := x.workPool.Wait() + results := make([]taskResult, len(r)) + for i, r := range r { + if r == nil { + results[i] = taskResult{ + task: nil, + stepResults: nil, + err: err, + } + } else { + results[i] = *r } } - - return x.results, nil + return results, err } -func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err error) { +func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (result *taskResult, err error) { // Ensure that the status is updated when we're done. defer func() { ui.TaskFinished(task, err) @@ -158,7 +146,7 @@ func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err // Let's set up our logging. l, err := x.opts.Logger.AddTask(util.SlugForPathInRepo(task.Repository.Name, task.Repository.Rev(), task.Path)) if err != nil { - return errors.Wrap(err, "creating log file") + return nil, errors.Wrap(err, "creating log file") } defer l.Close() @@ -197,18 +185,10 @@ func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err } l.MarkErrored() } - x.addResult(task, stepResults, err) - - return err -} - -func (x *executor) addResult(task *Task, stepResults []execution.AfterStepResult, err error) { - x.resultsMu.Lock() - defer x.resultsMu.Unlock() - x.results = append(x.results, taskResult{ + return &taskResult{ task: task, stepResults: stepResults, err: err, - }) + }, err } diff --git a/internal/batches/executor/executor_test.go b/internal/batches/executor/executor_test.go index 3fa748cf1f..198ab495d4 100644 --- a/internal/batches/executor/executor_test.go +++ b/internal/batches/executor/executor_test.go @@ -10,7 +10,6 @@ import ( "path/filepath" "runtime" "strings" - "sync" "testing" "time" @@ -79,6 +78,8 @@ func TestExecutor_Integration(t *testing.T) { wantFinishedWithErr int wantCacheCount int + + failFast bool }{ { name: "success", @@ -317,6 +318,41 @@ func TestExecutor_Integration(t *testing.T) { wantFinishedWithErr: 1, wantCacheCount: 2, }, + { + name: "fail fast mode", + archives: []mock.RepoArchive{ + {RepoName: testRepo1.Name, Commit: testRepo1.Rev(), Files: map[string]string{ + "README.md": "# Welcome to the README\n", + }}, + {RepoName: testRepo2.Name, Commit: testRepo2.Rev(), Files: map[string]string{ + "README.md": "# Sourcegraph README\n", + }}, + }, + steps: []batcheslib.Step{ + { + Run: `exit 1`, + If: fmt.Sprintf(`${{ eq repository.name %q }}`, testRepo1.Name), + }, + {Run: `echo -e "foobar\n" >> README.md`}, + }, + tasks: []*Task{ + {Repository: testRepo1}, + {Repository: testRepo2}, + }, + //wantFilesChanged: filesByRepository{ + // testRepo1.ID: {}, + // testRepo2.ID: filesByPath{ + // rootPath: []string{"README.md"}, + // }, + //}, + wantErrInclude: "execution in github.com/sourcegraph/src-cli failed: run: exit 1", + // In fail fast mode, we expect only the failing task to be processed + // and no successful completions + wantFinished: 0, + wantFinishedWithErr: 1, + //wantCacheCount: 1, + failFast: true, // Enable fail fast mode + }, { name: "mount path", archives: []mock.RepoArchive{ @@ -377,17 +413,24 @@ func TestExecutor_Integration(t *testing.T) { // Temp dir for log files and downloaded archives testTempDir := t.TempDir() - cr, _ := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, images) + ctx := context.Background() + cr, _ := workspace.NewCreator(ctx, "bind", testTempDir, testTempDir, images) // Setup executor + parallelism := 0 + if tc.failFast { + parallelism = 1 + } opts := NewExecutorOpts{ + Context: ctx, Creator: cr, RepoArchiveRegistry: repozip.NewArchiveRegistry(client, testTempDir, false), Logger: mock.LogNoOpManager{}, EnsureImage: imageMapEnsurer(images), TempDir: testTempDir, - Parallelism: runtime.GOMAXPROCS(0), + Parallelism: runtime.GOMAXPROCS(parallelism), Timeout: tc.executorTimeout, + FailFast: tc.failFast, } if opts.Timeout == 0 { @@ -398,9 +441,9 @@ func TestExecutor_Integration(t *testing.T) { executor := NewExecutor(opts) // Run executor - executor.Start(context.Background(), tc.tasks, dummyUI) + executor.Start(tc.tasks, dummyUI) - results, err := executor.Wait(context.Background()) + results, err := executor.Wait() if tc.wantErrInclude == "" { if err != nil { t.Fatalf("execution failed: %s", err) @@ -491,10 +534,10 @@ func TestExecutor_Integration(t *testing.T) { // Make sure that all the Tasks have been updated correctly if have, want := len(dummyUI.finished), tc.wantFinished; have != want { - t.Fatalf("wrong number of finished tasks. want=%d, have=%d", want, have) + t.Fatalf("wrong number of UI finished tasks. want=%d, have=%d", want, have) } if have, want := len(dummyUI.finishedWithErr), tc.wantFinishedWithErr; have != want { - t.Fatalf("wrong number of finished-with-err tasks. want=%d, have=%d", want, have) + t.Fatalf("wrong number of UI finished-with-err tasks. want=%d, have=%d", want, have) } }) } @@ -771,122 +814,6 @@ index 3040106..5f2f924 100644 }) } -func TestExecutor_FailFast(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Test doesn't work on Windows because dummydocker is written in bash") - } - - addToPath(t, "testdata/dummydocker") - - defaultBatchChangeAttributes := &template.BatchChangeAttributes{ - Name: "fail-fast-test-batch-change", - Description: "this tests fail-fast functionality", - } - - archives := []mock.RepoArchive{ - {RepoName: testRepo1.Name, Commit: testRepo1.Rev(), Files: map[string]string{ - "README.md": "# Welcome to the README\n", - }}, - {RepoName: testRepo2.Name, Commit: testRepo2.Rev(), Files: map[string]string{ - "README.md": "# Sourcegraph README\n", - }}, - } - - steps := []batcheslib.Step{ - {Run: `echo "success" >> success.txt`}, - {Run: `exit 1`}, // This step will fail - } - - tasks := []*Task{ - {Repository: testRepo1, BatchChangeAttributes: defaultBatchChangeAttributes, Steps: steps}, - {Repository: testRepo2, BatchChangeAttributes: defaultBatchChangeAttributes, Steps: steps}, - } - - // Setup a mock test server - mux := mock.NewZipArchivesMux(t, nil, archives...) - ts := httptest.NewServer(mux) - defer ts.Close() - - // Setup an api.Client that points to this test server - var clientBuffer bytes.Buffer - client := api.NewClient(api.ClientOpts{Endpoint: ts.URL, Out: &clientBuffer}) - - // Temp dir for log files and downloaded archives - testTempDir := t.TempDir() - - // Create test images - images := make(map[string]docker.Image) - for _, step := range steps { - images[step.Container] = &mock.Image{RawDigest: step.Container} - } - - cr, _ := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, images) - - t.Run("without fail-fast", func(t *testing.T) { - // Setup executor without fail-fast - opts := NewExecutorOpts{ - Creator: cr, - RepoArchiveRegistry: repozip.NewArchiveRegistry(client, testTempDir, false), - Logger: mock.LogNoOpManager{}, - EnsureImage: imageMapEnsurer(images), - TempDir: testTempDir, - Parallelism: 1, // Use sequential execution to ensure predictable order - Timeout: 30 * time.Second, - FailFast: false, // Explicitly set to false - } - - dummyUI := newDummyTaskExecutionUI() - executor := NewExecutor(opts) - - // Run executor - ctx := context.Background() - executor.Start(ctx, tasks, dummyUI) - results, err := executor.Wait(ctx) - - // Without fail-fast, we should have errors but still process all tasks - require.Error(t, err, "execution should have failed") - require.Len(t, results, 2, "should have results for both tasks") - require.Len(t, dummyUI.finishedWithErr, 2, "both tasks should have finished with error") - }) - - t.Run("with fail-fast", func(t *testing.T) { - // Setup executor with fail-fast - opts := NewExecutorOpts{ - Creator: cr, - RepoArchiveRegistry: repozip.NewArchiveRegistry(client, testTempDir, false), - Logger: mock.LogNoOpManager{}, - EnsureImage: imageMapEnsurer(images), - TempDir: testTempDir, - Parallelism: 1, // Use sequential execution to ensure predictable order - Timeout: 30 * time.Second, - FailFast: true, // Enable fail-fast - } - - dummyUI := newDummyTaskExecutionUI2() - executor := NewExecutor(opts) - - // Run executor - ctx := context.Background() - executor.Start(ctx, tasks, dummyUI) - results, err := executor.Wait(ctx) - - // With fail-fast, we should still have error but only process the first task - require.Error(t, err, "execution should have failed") - - // Check that we have at least one result (from the first task) - require.NotEmpty(t, results, "should have at least one result") - - // The key check: we should have exactly one task that finished with error - // This verifies that execution stopped after the first error - require.Equal(t, 1, len(dummyUI.finishedWithErr), "only the first task should have finished with error") - - // Additionally, verify that the second task was never started or at least not finished - // This is a more precise check for the fail-fast behavior - _, secondTaskFinished := dummyUI.finishedWithErr[tasks[1]] - require.False(t, secondTaskFinished, "second task should not have finished with error due to fail-fast") - }) -} - func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive) ([]taskResult, error) { if runtime.GOOS == "windows" { t.Skip("Test doesn't work on Windows because dummydocker is written in bash") @@ -914,9 +841,11 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive) } } - cr, _ := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, images) + ctx := context.Background() + cr, _ := workspace.NewCreator(ctx, "bind", testTempDir, testTempDir, images) // Setup executor executor := NewExecutor(NewExecutorOpts{ + Context: ctx, Creator: cr, RepoArchiveRegistry: repozip.NewArchiveRegistry(client, testTempDir, false), Logger: mock.LogNoOpManager{}, @@ -927,8 +856,8 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive) Timeout: 30 * time.Second, }) - executor.Start(context.Background(), tasks, newDummyTaskExecutionUI()) - return executor.Wait(context.Background()) + executor.Start(tasks, newDummyTaskExecutionUI()) + return executor.Wait() } func imageMapEnsurer(m map[string]docker.Image) imageEnsurer { @@ -939,57 +868,3 @@ func imageMapEnsurer(m map[string]docker.Image) imageEnsurer { return nil, errors.New(fmt.Sprintf("image for %s not found", container)) } } - -var _ TaskExecutionUI = &dummyTaskExecutionUI2{} - -type dummyTaskExecutionUI2 struct { - mu sync.Mutex - - started map[*Task]struct{} - finished map[*Task]struct{} - finishedWithErr map[*Task]struct{} - specs map[*Task][]*batcheslib.ChangesetSpec -} - -func newDummyTaskExecutionUI2() *dummyTaskExecutionUI2 { - return &dummyTaskExecutionUI2{ - started: map[*Task]struct{}{}, - finished: map[*Task]struct{}{}, - finishedWithErr: map[*Task]struct{}{}, - specs: map[*Task][]*batcheslib.ChangesetSpec{}, - } -} - -func (d *dummyTaskExecutionUI2) Start([]*Task) {} -func (d *dummyTaskExecutionUI2) Success() {} -func (d *dummyTaskExecutionUI2) Failed(err error) {} - -func (d *dummyTaskExecutionUI2) TaskStarted(t *Task) { - d.mu.Lock() - defer d.mu.Unlock() - - d.started[t] = struct{}{} -} - -func (d *dummyTaskExecutionUI2) TaskFinished(t *Task, err error) { - d.mu.Lock() - defer d.mu.Unlock() - - delete(d.started, t) - if err != nil { - d.finishedWithErr[t] = struct{}{} - } else { - d.finished[t] = struct{}{} - } -} - -func (d *dummyTaskExecutionUI2) TaskChangesetSpecsBuilt(t *Task, specs []*batcheslib.ChangesetSpec) { - d.mu.Lock() - defer d.mu.Unlock() - - d.specs[t] = specs -} - -func (d *dummyTaskExecutionUI2) StepsExecutionUI(t *Task) StepsExecutionUI { - return NoopStepsExecUI{} -} From 04401e383bd1a433b68b2835bb031c657e9ceffb Mon Sep 17 00:00:00 2001 From: Michael Bahr Date: Thu, 20 Mar 2025 09:54:40 +0100 Subject: [PATCH 3/8] wip --- internal/batches/executor/executor_test.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/internal/batches/executor/executor_test.go b/internal/batches/executor/executor_test.go index 198ab495d4..51950c27ed 100644 --- a/internal/batches/executor/executor_test.go +++ b/internal/batches/executor/executor_test.go @@ -331,7 +331,8 @@ func TestExecutor_Integration(t *testing.T) { steps: []batcheslib.Step{ { Run: `exit 1`, - If: fmt.Sprintf(`${{ eq repository.name %q }}`, testRepo1.Name), + // We must fail for the first repository, so that the other repo's work is cancelled in fail fast mode. + If: fmt.Sprintf(`${{ eq repository.name %q }}`, testRepo1.Name), }, {Run: `echo -e "foobar\n" >> README.md`}, }, @@ -339,19 +340,11 @@ func TestExecutor_Integration(t *testing.T) { {Repository: testRepo1}, {Repository: testRepo2}, }, - //wantFilesChanged: filesByRepository{ - // testRepo1.ID: {}, - // testRepo2.ID: filesByPath{ - // rootPath: []string{"README.md"}, - // }, - //}, wantErrInclude: "execution in github.com/sourcegraph/src-cli failed: run: exit 1", - // In fail fast mode, we expect only the failing task to be processed - // and no successful completions + // In fail fast mode, we expect that other steps are cancelled. wantFinished: 0, - wantFinishedWithErr: 1, - //wantCacheCount: 1, - failFast: true, // Enable fail fast mode + wantFinishedWithErr: 2, + failFast: true, }, { name: "mount path", From dd70029c53093d2930e679b9a9f4da346bbb8e1b Mon Sep 17 00:00:00 2001 From: Michael Bahr Date: Thu, 20 Mar 2025 09:59:34 +0100 Subject: [PATCH 4/8] update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 80c1c9615f..de4ab78c04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ All notable changes to `src-cli` are documented in this file. ### Added -- Batch Changes: Added `-fail-fast` flag to `src batch preview` and `src batch apply` that causes execution to immediately halt on the first error instead of continuing with other repositories. This enables faster iteration on batch specs. [#TBD](https://github.com/sourcegraph/src-cli/pull/TBD) +- Batch Changes: Added `-fail-fast` flag to `src batch preview` and `src batch apply` that causes execution to immediately halt on the first error instead of continuing with other repositories. This enables faster iteration on batch specs. [#1154](https://github.com/sourcegraph/src-cli/pull/1154) ## 6.1.0 - Support uploading GZIP compressed SCIP indexes [1146](https://github.com/sourcegraph/src-cli/pull/1146) From d06a407c8894c6b61e697a113e6eb3e596ff244d Mon Sep 17 00:00:00 2001 From: Michael Bahr Date: Thu, 20 Mar 2025 10:33:22 +0100 Subject: [PATCH 5/8] linting --- cmd/src/batch_common.go | 2 +- internal/batches/executor/executor.go | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/cmd/src/batch_common.go b/cmd/src/batch_common.go index 930dc2b5f1..986f8707f0 100644 --- a/cmd/src/batch_common.go +++ b/cmd/src/batch_common.go @@ -169,7 +169,7 @@ func newBatchExecuteFlags(flagSet *flag.FlagSet, cacheDir, tempDir string) *batc flagSet.BoolVar( &caf.failFast, "fail-fast", false, - "If true, fails fast and halts execution immediately upon first error instead of continuing with other repositories.", + "Halts execution immediately upon first error instead of continuing with other tasks.", ) return caf diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index 499566e2cb..e633d5d464 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -3,9 +3,9 @@ package executor import ( "context" "fmt" - "github.com/sourcegraph/conc/pool" "time" + "github.com/sourcegraph/conc/pool" "github.com/sourcegraph/sourcegraph/lib/errors" "github.com/sourcegraph/src-cli/internal/batches/docker" @@ -78,8 +78,6 @@ type executor struct { workPool *pool.ResultContextPool[*taskResult] doneEnqueuing chan struct{} - - results []taskResult } func NewExecutor(opts NewExecutorOpts) *executor { From 1932ec1a1cab72daf9e79afa2f1bce9426529737 Mon Sep 17 00:00:00 2001 From: Michael Bahr Date: Thu, 20 Mar 2025 10:35:31 +0100 Subject: [PATCH 6/8] linting --- internal/batches/executor/executor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index e633d5d464..9b2b4e28a0 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -6,6 +6,7 @@ import ( "time" "github.com/sourcegraph/conc/pool" + "github.com/sourcegraph/sourcegraph/lib/errors" "github.com/sourcegraph/src-cli/internal/batches/docker" From 13599a28889d7716f6b466e5f10e2e70fc69441e Mon Sep 17 00:00:00 2001 From: Michael Bahr Date: Thu, 20 Mar 2025 16:15:37 +0100 Subject: [PATCH 7/8] pass ctx into Start --- cmd/src/batch_common.go | 1 - internal/batches/executor/coordinator.go | 4 ++-- internal/batches/executor/coordinator_test.go | 14 ++++++------- internal/batches/executor/executor.go | 21 ++++++++----------- internal/batches/executor/executor_test.go | 6 ++---- 5 files changed, 20 insertions(+), 26 deletions(-) diff --git a/cmd/src/batch_common.go b/cmd/src/batch_common.go index 986f8707f0..157af55927 100644 --- a/cmd/src/batch_common.go +++ b/cmd/src/batch_common.go @@ -428,7 +428,6 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error coord := executor.NewCoordinator( executor.NewCoordinatorOpts{ ExecOpts: executor.NewExecutorOpts{ - Context: ctx, Logger: logManager, RepoArchiveRegistry: archiveRegistry, Creator: workspaceCreator, diff --git a/internal/batches/executor/coordinator.go b/internal/batches/executor/coordinator.go index dcab600296..746ea86d8e 100644 --- a/internal/batches/executor/coordinator.go +++ b/internal/batches/executor/coordinator.go @@ -13,7 +13,7 @@ import ( ) type taskExecutor interface { - Start([]*Task, TaskExecutionUI) + Start(context.Context, []*Task, TaskExecutionUI) Wait() ([]taskResult, error) } @@ -181,7 +181,7 @@ func (c *Coordinator) ExecuteAndBuildSpecs(ctx context.Context, batchSpec *batch ui.Start(tasks) // Run executor. - c.exec.Start(tasks, ui) + c.exec.Start(ctx, tasks, ui) results, errs := c.exec.Wait() // Write all step cache results to the cache. diff --git a/internal/batches/executor/coordinator_test.go b/internal/batches/executor/coordinator_test.go index e5b8a6dfe7..53e17c0fab 100644 --- a/internal/batches/executor/coordinator_test.go +++ b/internal/batches/executor/coordinator_test.go @@ -402,7 +402,7 @@ func TestCoordinator_Execute_StepCaching(t *testing.T) { task.Steps[1].Run = `echo "two modified"` // Re-execution should start with the diff produced by steps[0] as the // start state from which steps[1] is then re-executed. - execAndEnsure(t, coord, executor, batchSpec, task, func(t []*Task, teu TaskExecutionUI) {}) + execAndEnsure(t, coord, executor, batchSpec, task, func(ctx context.Context, t []*Task, teu TaskExecutionUI) {}) // Cache now contains old entries, plus another "complete task" entry and // two entries for newly executed steps. assertCacheSize(t, cache, 5) @@ -413,7 +413,7 @@ func TestCoordinator_Execute_StepCaching(t *testing.T) { // Change the 3rd step's definition: task.Steps[2].Run = `echo "three modified"` // Re-execution should use the diff from steps[1] as start state - execAndEnsure(t, coord, executor, batchSpec, task, func(t []*Task, teu TaskExecutionUI) {}) + execAndEnsure(t, coord, executor, batchSpec, task, func(ctx context.Context, t []*Task, teu TaskExecutionUI) {}) // Cache now contains old entries, plus another "complete task" entry and // a single new step entry assertCacheSize(t, cache, 6) @@ -480,8 +480,8 @@ func assertCacheSize(t *testing.T, cache *inMemoryExecutionCache, want int) { // expectCachedResultForStep returns a function that can be used as a // startCallback on dummyExecutor to assert that the first Task has no cached results. -func assertNoCachedResult(t *testing.T) func([]*Task, TaskExecutionUI) { - return func(tasks []*Task, ui TaskExecutionUI) { +func assertNoCachedResult(t *testing.T) func(context.Context, []*Task, TaskExecutionUI) { + return func(_ context.Context, tasks []*Task, ui TaskExecutionUI) { t.Helper() task := tasks[0] @@ -491,7 +491,7 @@ func assertNoCachedResult(t *testing.T) func([]*Task, TaskExecutionUI) { } } -type startCallback func([]*Task, TaskExecutionUI) +type startCallback func(context.Context, []*Task, TaskExecutionUI) var _ TaskExecutionUI = &dummyTaskExecutionUI{} @@ -554,9 +554,9 @@ type dummyExecutor struct { waitErr error } -func (d *dummyExecutor) Start(ts []*Task, ui TaskExecutionUI) { +func (d *dummyExecutor) Start(ctx context.Context, ts []*Task, ui TaskExecutionUI) { if d.startCb != nil { - d.startCb(ts, ui) + d.startCb(ctx, ts, ui) d.startCbCalled = true } // "noop noop noop", the crowd screams diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index 9b2b4e28a0..45fb7307c2 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -71,7 +71,6 @@ type NewExecutorOpts struct { FailFast bool BinaryDiffs bool - Context context.Context } type executor struct { @@ -82,33 +81,31 @@ type executor struct { } func NewExecutor(opts NewExecutorOpts) *executor { - p := pool.NewWithResults[*taskResult]().WithMaxGoroutines(opts.Parallelism).WithContext(opts.Context) - if opts.FailFast { - p = p.WithCancelOnError() - } return &executor{ - opts: opts, - + opts: opts, doneEnqueuing: make(chan struct{}), - workPool: p, } } // Start starts the execution of the given Tasks in goroutines, calling the // given taskStatusHandler to update the progress of the tasks. -func (x *executor) Start(tasks []*Task, ui TaskExecutionUI) { +func (x *executor) Start(ctx context.Context, tasks []*Task, ui TaskExecutionUI) { defer func() { close(x.doneEnqueuing) }() + x.workPool = pool.NewWithResults[*taskResult]().WithMaxGoroutines(x.opts.Parallelism).WithContext(ctx) + if x.opts.FailFast { + x.workPool = x.workPool.WithCancelOnError() + } + for _, task := range tasks { select { - case <-x.opts.Context.Done(): + case <-ctx.Done(): return default: } - t := task x.workPool.Go(func(c context.Context) (*taskResult, error) { - return x.do(c, t, ui) + return x.do(c, task, ui) }) } } diff --git a/internal/batches/executor/executor_test.go b/internal/batches/executor/executor_test.go index 51950c27ed..46a5cdb061 100644 --- a/internal/batches/executor/executor_test.go +++ b/internal/batches/executor/executor_test.go @@ -414,7 +414,6 @@ func TestExecutor_Integration(t *testing.T) { parallelism = 1 } opts := NewExecutorOpts{ - Context: ctx, Creator: cr, RepoArchiveRegistry: repozip.NewArchiveRegistry(client, testTempDir, false), Logger: mock.LogNoOpManager{}, @@ -434,7 +433,7 @@ func TestExecutor_Integration(t *testing.T) { executor := NewExecutor(opts) // Run executor - executor.Start(tc.tasks, dummyUI) + executor.Start(ctx, tc.tasks, dummyUI) results, err := executor.Wait() if tc.wantErrInclude == "" { @@ -838,7 +837,6 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive) cr, _ := workspace.NewCreator(ctx, "bind", testTempDir, testTempDir, images) // Setup executor executor := NewExecutor(NewExecutorOpts{ - Context: ctx, Creator: cr, RepoArchiveRegistry: repozip.NewArchiveRegistry(client, testTempDir, false), Logger: mock.LogNoOpManager{}, @@ -849,7 +847,7 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive) Timeout: 30 * time.Second, }) - executor.Start(tasks, newDummyTaskExecutionUI()) + executor.Start(ctx, tasks, newDummyTaskExecutionUI()) return executor.Wait() } From d65f884a19993a94368903dca78acb2ad8af66e2 Mon Sep 17 00:00:00 2001 From: Michael Bahr Date: Thu, 20 Mar 2025 16:30:31 +0100 Subject: [PATCH 8/8] add delay to stabilize concurrent test --- internal/batches/executor/executor_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/batches/executor/executor_test.go b/internal/batches/executor/executor_test.go index 46a5cdb061..2df8c21ea0 100644 --- a/internal/batches/executor/executor_test.go +++ b/internal/batches/executor/executor_test.go @@ -334,6 +334,11 @@ func TestExecutor_Integration(t *testing.T) { // We must fail for the first repository, so that the other repo's work is cancelled in fail fast mode. If: fmt.Sprintf(`${{ eq repository.name %q }}`, testRepo1.Name), }, + { + // We introduce an artificial way for the second repository, so that it can't complete before the failure of the first one. + Run: `sleep 0.1`, + If: fmt.Sprintf(`${{ eq repository.name %q }}`, testRepo2.Name), + }, {Run: `echo -e "foobar\n" >> README.md`}, }, tasks: []*Task{