Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Exesh/config/coordinator/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ job_factory:
execution_scheduler:
executions_interval: 5s
max_concurrency: 10
execution_retry_after: 15s
execution_retry_after: 300s
worker_pool:
worker_die_after: 10s
artifact_registry:
artifact_ttl: 3m
sender:
brokers:
- 0.0.0.0:29092
topic: exesh.step-updates
topic: exesh.step-updates
2 changes: 1 addition & 1 deletion Exesh/config/coordinator/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ job_factory:
execution_scheduler:
executions_interval: 3s
max_concurrency: 10
execution_retry_after: 15s
execution_retry_after: 300s
worker_pool:
worker_die_after: 10s
artifact_registry:
Expand Down
3 changes: 3 additions & 0 deletions Exesh/example/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
a.checker.out
a.out
out.txt
1 change: 0 additions & 1 deletion Exesh/example/in.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
1 2

5 changes: 4 additions & 1 deletion Exesh/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (dp *dummyOutputProvider) Locate(ctx context.Context, out execution.Output)
return out.GetFile(), func() {}, nil
}

func (dp *dummyOutputProvider) Reserve(ctx context.Context, out execution.Output) (path string, unlock func() error, smth func() error, err error) {
return out.GetFile(), func() error { return nil }, func() error { return nil }, nil
}

func (dp *dummyOutputProvider) Read(ctx context.Context, out execution.Output) (r io.Reader, unlock func(), err error) {
unlock = func() {}
f, err := os.OpenFile(out.GetFile(), os.O_RDONLY, 0o755)
Expand Down Expand Up @@ -124,7 +128,6 @@ func main() {
runJobId, inputs.NewArtifactInput("a.checker.out", checkJobId, workerID),
inputs.NewArtifactInput("correct.txt", checkJobId, workerID),
inputs.NewArtifactInput("out.txt", checkJobId, workerID),
outputs.NewArtifactOutput("verdict.txt", checkJobId),
)))
fmt.Printf("check: %#v\n", checkResult)
}
29 changes: 23 additions & 6 deletions Exesh/internal/runtime/docker/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"archive/tar"
"bytes"
"context"
"errors"
"exesh/internal/runtime"
"fmt"
"io"
"os"
"path/filepath"
"time"

"exesh/internal/runtime"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
Expand Down Expand Up @@ -76,8 +76,9 @@ func (dr *Runtime) Execute(ctx context.Context, cmd []string, params runtime.Exe
cpuPolicy(int64(params.Limits.Time) / int64(time.Second))(hostConfig)
memoryPolicy(int64(params.Limits.Memory))(hostConfig)

// we do not know why, but without StdinOnce, without CloseWrite stdin is not closed, and with it - stdout is empty
cr, err := dr.client.ContainerCreate(ctx,
&container.Config{Image: dr.baseImage, Cmd: cmd, OpenStdin: true},
&container.Config{Image: dr.baseImage, Cmd: cmd, OpenStdin: true, StdinOnce: true},
hostConfig,
&network.NetworkingConfig{},
&v1.Platform{OS: "linux", Architecture: "amd64"},
Expand Down Expand Up @@ -144,8 +145,7 @@ func (dr *Runtime) Execute(ctx context.Context, cmd []string, params runtime.Exe
if params.Stdin != nil {
go func(r io.Reader) {
_, _ = io.Copy(hjr.Conn, params.Stdin)
// BUG: why the fuck does this "CloseWrite" cause stdout to be empty
// hjr.CloseWrite()
hjr.CloseWrite()
}(params.Stdin)
}

Expand All @@ -154,16 +154,33 @@ func (dr *Runtime) Execute(ctx context.Context, cmd []string, params runtime.Exe
return fmt.Errorf("start container: %w", err)
}

// force larger deadline because the submission may just hang waiting for input
timeout := 10 * time.Second
if params.Limits.Time != 0 {
timeout = 5 * time.Duration(params.Limits.Time)
}
ctxTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var insp container.InspectResponse
for {
insp, err = dr.client.ContainerInspect(ctx, cr.ID)
insp, err = dr.client.ContainerInspect(ctxTimeout, cr.ID)
if err != nil {
return fmt.Errorf("inspect container: %w", err)
}

if !insp.State.Running {
break
}

select {
case <-ctxTimeout.Done():
if errors.Is(ctxTimeout.Err(), context.DeadlineExceeded) {
return runtime.ErrTimeout
}
return ctx.Err()
case <-time.After(1 * time.Second):
}
}

if insp.State.ExitCode == 137 {
Expand Down
Loading