diff --git a/Exesh/config/coordinator/dev.yml b/Exesh/config/coordinator/dev.yml index 3b6a2e06..4b5db66f 100644 --- a/Exesh/config/coordinator/dev.yml +++ b/Exesh/config/coordinator/dev.yml @@ -22,7 +22,7 @@ job_factory: execution_scheduler: executions_interval: 5s max_concurrency: 10 - execution_retry_after: 15s + execution_retry_after: 300s worker_pool: worker_die_after: 10s artifact_registry: @@ -30,4 +30,4 @@ artifact_registry: sender: brokers: - 0.0.0.0:29092 - topic: exesh.step-updates \ No newline at end of file + topic: exesh.step-updates diff --git a/Exesh/config/coordinator/docker.yml b/Exesh/config/coordinator/docker.yml index 4f7eed68..6ab4802d 100644 --- a/Exesh/config/coordinator/docker.yml +++ b/Exesh/config/coordinator/docker.yml @@ -23,7 +23,7 @@ job_factory: execution_scheduler: executions_interval: 3s max_concurrency: 10 - execution_retry_after: 15s + execution_retry_after: 300s worker_pool: worker_die_after: 10s artifact_registry: diff --git a/Exesh/example/.gitignore b/Exesh/example/.gitignore new file mode 100644 index 00000000..ff966fb8 --- /dev/null +++ b/Exesh/example/.gitignore @@ -0,0 +1,3 @@ +a.checker.out +a.out +out.txt diff --git a/Exesh/example/in.txt b/Exesh/example/in.txt index ccb07bfe..8d04f961 100644 --- a/Exesh/example/in.txt +++ b/Exesh/example/in.txt @@ -1,2 +1 @@ 1 2 - diff --git a/Exesh/example/main.go b/Exesh/example/main.go index 90cac8cc..f4123871 100644 --- a/Exesh/example/main.go +++ b/Exesh/example/main.go @@ -65,6 +65,10 @@ func (dp *dummyOutputProvider) Locate(ctx context.Context, out execution.Output) return out.GetFile(), func() {}, nil } +func (dp *dummyOutputProvider) Reserve(ctx context.Context, out execution.Output) (path string, unlock func() error, smth func() error, err error) { + return out.GetFile(), func() error { return nil }, func() error { return nil }, nil +} + func (dp *dummyOutputProvider) Read(ctx context.Context, out execution.Output) (r io.Reader, unlock func(), err error) { unlock = func() {} f, err := os.OpenFile(out.GetFile(), os.O_RDONLY, 0o755) @@ -124,7 +128,6 @@ func main() { runJobId, inputs.NewArtifactInput("a.checker.out", checkJobId, workerID), inputs.NewArtifactInput("correct.txt", checkJobId, workerID), inputs.NewArtifactInput("out.txt", checkJobId, workerID), - outputs.NewArtifactOutput("verdict.txt", checkJobId), ))) fmt.Printf("check: %#v\n", checkResult) } diff --git a/Exesh/internal/runtime/docker/runtime.go b/Exesh/internal/runtime/docker/runtime.go index 4ee237d4..d41e2577 100644 --- a/Exesh/internal/runtime/docker/runtime.go +++ b/Exesh/internal/runtime/docker/runtime.go @@ -4,14 +4,14 @@ import ( "archive/tar" "bytes" "context" + "errors" + "exesh/internal/runtime" "fmt" "io" "os" "path/filepath" "time" - "exesh/internal/runtime" - "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" "github.com/docker/docker/client" @@ -76,8 +76,9 @@ func (dr *Runtime) Execute(ctx context.Context, cmd []string, params runtime.Exe cpuPolicy(int64(params.Limits.Time) / int64(time.Second))(hostConfig) memoryPolicy(int64(params.Limits.Memory))(hostConfig) + // we do not know why, but without StdinOnce, without CloseWrite stdin is not closed, and with it - stdout is empty cr, err := dr.client.ContainerCreate(ctx, - &container.Config{Image: dr.baseImage, Cmd: cmd, OpenStdin: true}, + &container.Config{Image: dr.baseImage, Cmd: cmd, OpenStdin: true, StdinOnce: true}, hostConfig, &network.NetworkingConfig{}, &v1.Platform{OS: "linux", Architecture: "amd64"}, @@ -144,8 +145,7 @@ func (dr *Runtime) Execute(ctx context.Context, cmd []string, params runtime.Exe if params.Stdin != nil { go func(r io.Reader) { _, _ = io.Copy(hjr.Conn, params.Stdin) - // BUG: why the fuck does this "CloseWrite" cause stdout to be empty - // hjr.CloseWrite() + hjr.CloseWrite() }(params.Stdin) } @@ -154,9 +154,17 @@ func (dr *Runtime) Execute(ctx context.Context, cmd []string, params runtime.Exe return fmt.Errorf("start container: %w", err) } + // force larger deadline because the submission may just hang waiting for input + timeout := 10 * time.Second + if params.Limits.Time != 0 { + timeout = 5 * time.Duration(params.Limits.Time) + } + ctxTimeout, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + var insp container.InspectResponse for { - insp, err = dr.client.ContainerInspect(ctx, cr.ID) + insp, err = dr.client.ContainerInspect(ctxTimeout, cr.ID) if err != nil { return fmt.Errorf("inspect container: %w", err) } @@ -164,6 +172,15 @@ func (dr *Runtime) Execute(ctx context.Context, cmd []string, params runtime.Exe if !insp.State.Running { break } + + select { + case <-ctxTimeout.Done(): + if errors.Is(ctxTimeout.Err(), context.DeadlineExceeded) { + return runtime.ErrTimeout + } + return ctx.Err() + case <-time.After(1 * time.Second): + } } if insp.State.ExitCode == 137 {