Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 97 additions & 1 deletion agent/integration/job_runner_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package integration
import (
"context"
"fmt"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/buildkite/agent/v3/agent"
"github.com/buildkite/agent/v3/api"
Expand Down Expand Up @@ -73,7 +76,6 @@ func TestPreBootstrapHookScripts(t *testing.T) {
}

for _, tc := range testCases {

t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand Down Expand Up @@ -354,3 +356,97 @@ func TestJobRunnerIgnoresPipelineChangesToProtectedVars(t *testing.T) {
t.Errorf("runJob(t, ctx, %v) = %v", testCfg, err)
}
}

func TestChunksIntervalSeconds_ControlsUploadTiming(t *testing.T) {
t.Parallel()

runTestWithInterval := func(t *testing.T, intervalSeconds int) int {
t.Helper()
ctx := context.Background()

var (
chunkCount int
mu sync.Mutex
)

e := createTestAgentEndpoint()
server := e.server(route{
Method: "POST",
Path: "/jobs/{id}/chunks",
HandlerFunc: func(rw http.ResponseWriter, req *http.Request) {
mu.Lock()
chunkCount++
mu.Unlock()
e.chunksHandler()(rw, req)
},
})
t.Cleanup(server.Close)

j := &api.Job{
ID: defaultJobID,
ChunksMaxSizeBytes: 100_000, // large number that will never get divided into multiple chunks
ChunksIntervalSeconds: intervalSeconds,
Env: map[string]string{},
Token: "bkaj_job-token",
}

mb := mockBootstrap(t)
mb.Expect().Once().AndCallFunc(func(c *bintest.Call) {
start := time.Now()
for time.Since(start) < 4*time.Second {
fmt.Fprintf(c.Stdout, "Log output at %v\n", time.Now())
time.Sleep(100 * time.Millisecond)
}
c.Exit(0)
})

if err := runJob(t, ctx, testRunJobConfig{
job: j,
server: server,
agentCfg: agent.AgentConfiguration{},
mockBootstrap: mb,
}); err != nil {
t.Fatalf("runJob() error = %v", err)
}

mb.CheckAndClose(t) //nolint:errcheck

mu.Lock()
defer mu.Unlock()

t.Logf("Interval %ds: %d chunks uploaded", intervalSeconds, chunkCount)
return chunkCount
}

t.Run("2s interval should upload fewer chunks than 1s interval", func(t *testing.T) {
var count1s, count2s int

wg := &sync.WaitGroup{}
wg.Add(2)

// these run for 4 seconds, so we run them in parallel to not quite so much wall-clock time
go func() {
defer wg.Done()
count1s = runTestWithInterval(t, 1)
}()

go func() {
defer wg.Done()
count2s = runTestWithInterval(t, 2)
}()

wg.Wait()

t.Logf("1s interval: %d chunks, 2s interval: %d chunks", count1s, count2s)

// With a 4s job:
// 1s interval: first chunk + chunks at ~1s, ~2s, ~3s, ~4s = 5 chunks
// 2s interval: first chunk + chunks at ~2s, ~4s = 3 chunks
if count1s != 5 {
t.Errorf("1s interval: got %d chunks, expected 5", count1s)
}
if count2s != 3 {
t.Errorf("2s interval: got %d chunks, expected 3", count2s)
}
})
}
6 changes: 4 additions & 2 deletions agent/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ One or more containers never connected to the agent. Perhaps the container image
One or more containers connected to the agent, but then stopped communicating without exiting normally. Perhaps the container was OOM-killed?
`)
}

}

// Collect the finished process' exit status
Expand Down Expand Up @@ -495,7 +494,10 @@ func (r *JobRunner) streamJobLogsAfterProcessStart(ctx context.Context, wg *sync
return
}

const processInterval = 1 * time.Second // TODO: make configurable?
processInterval := 1 * time.Second
if r.conf.Job.ChunksIntervalSeconds > 0 {
processInterval = time.Duration(r.conf.Job.ChunksIntervalSeconds) * time.Second
}
intervalTicker := time.NewTicker(processInterval)
defer intervalTicker.Stop()
first := make(chan struct{}, 1)
Expand Down
35 changes: 18 additions & 17 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,24 @@ import (

// Job represents a Buildkite Agent API Job
type Job struct {
ID string `json:"id,omitempty"`
Endpoint string `json:"endpoint"`
State string `json:"state,omitempty"`
Env map[string]string `json:"env,omitempty"`
Step pipeline.CommandStep `json:"step,omitempty"`
MatrixPermutation pipeline.MatrixPermutation `json:"matrix_permutation,omitempty"`
ChunksMaxSizeBytes uint64 `json:"chunks_max_size_bytes,omitempty"`
LogMaxSizeBytes uint64 `json:"log_max_size_bytes,omitempty"`
Token string `json:"token,omitempty"`
ExitStatus string `json:"exit_status,omitempty"`
Signal string `json:"signal,omitempty"`
SignalReason string `json:"signal_reason,omitempty"`
StartedAt string `json:"started_at,omitempty"`
FinishedAt string `json:"finished_at,omitempty"`
RunnableAt string `json:"runnable_at,omitempty"`
ChunksFailedCount int `json:"chunks_failed_count,omitempty"`
TraceParent string `json:"traceparent"`
ID string `json:"id,omitempty"`
Endpoint string `json:"endpoint"`
State string `json:"state,omitempty"`
Env map[string]string `json:"env,omitempty"`
Step pipeline.CommandStep `json:"step,omitempty"`
MatrixPermutation pipeline.MatrixPermutation `json:"matrix_permutation,omitempty"`
ChunksMaxSizeBytes uint64 `json:"chunks_max_size_bytes,omitempty"`
ChunksIntervalSeconds int `json:"chunks_interval_seconds,omitempty"`
LogMaxSizeBytes uint64 `json:"log_max_size_bytes,omitempty"`
Token string `json:"token,omitempty"`
ExitStatus string `json:"exit_status,omitempty"`
Signal string `json:"signal,omitempty"`
SignalReason string `json:"signal_reason,omitempty"`
StartedAt string `json:"started_at,omitempty"`
FinishedAt string `json:"finished_at,omitempty"`
RunnableAt string `json:"runnable_at,omitempty"`
ChunksFailedCount int `json:"chunks_failed_count,omitempty"`
TraceParent string `json:"traceparent"`
}

type JobState struct {
Expand Down