Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 9 additions & 19 deletions Exesh/cmd/coordinator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"exesh/internal/config"
"exesh/internal/factory"
"exesh/internal/pool"
"exesh/internal/provider"
"exesh/internal/provider/providers"
"exesh/internal/provider/providers/adapter"
"exesh/internal/provider/adapter"
"exesh/internal/registry"
schedule "exesh/internal/scheduler"
"exesh/internal/sender"
Expand Down Expand Up @@ -57,24 +55,22 @@ func main() {
return
}

filestorage, err := filestorage.New(log, cfg.FileStorage, mux)
fs, err := filestorage.New(log, cfg.FileStorage, mux)
if err != nil {
log.Error("failed to create filestorage", slog.String("error", err.Error()))
return
}
defer filestorage.Shutdown()

filestorageAdapter := adapter.NewFilestorageAdapter(filestorage)
inputProvider := setupInputProvider(cfg.InputProvider, filestorageAdapter)
defer fs.Shutdown()

workerPool := pool.NewWorkerPool(log, cfg.WorkerPool)
defer workerPool.StopObservers()

artifactRegistry := registry.NewArtifactRegistry(log, cfg.ArtifactRegistry, workerPool)

jobFactory := factory.NewJobFactory(log, cfg.JobFactory, artifactRegistry, inputProvider)
filestorageAdapter := adapter.NewFilestorageAdapter(fs)
executionFactory := factory.NewExecutionFactory(cfg.JobFactory, filestorageAdapter)

messageFactory := factory.NewMessageFactory(log)
messageFactory := factory.NewMessageFactory()
messageSender := sender.NewKafkaSender(log, cfg.Sender, unitOfWork, outboxStorage)
messageSender.Start(ctx)

Expand All @@ -83,13 +79,13 @@ func main() {
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
promCoordReg := prometheus.WrapRegistererWithPrefix("coduels_exesh_coordinator_", promRegistry)
promCoordinatorRegistry := prometheus.WrapRegistererWithPrefix("coduels_exesh_coordinator_", promRegistry)

jobScheduler := schedule.NewJobScheduler(log)
executionScheduler := schedule.NewExecutionScheduler(log, cfg.ExecutionScheduler, unitOfWork, executionStorage,
jobFactory, jobScheduler, messageFactory, messageSender)
executionFactory, artifactRegistry, jobScheduler, messageFactory, messageSender)

err = executionScheduler.RegisterMetrics(promCoordReg)
err = executionScheduler.RegisterMetrics(promCoordinatorRegistry)
if err != nil {
log.Error("could not register metrics from execution scheduler", slog.Any("err", err))
return
Expand Down Expand Up @@ -181,9 +177,3 @@ func setupStorage(log *slog.Logger, cfg config.StorageConfig) (

return unitOfWork, executionStorage, outboxStorage, err
}

func setupInputProvider(cfg config.InputProviderConfig, filestorageAdapter *adapter.FilestorageAdapter) *provider.InputProvider {
filestorageBucketInputProvider := providers.NewFilestorageBucketInputProvider(filestorageAdapter, cfg.FilestorageBucketTTL)
artifactInputProvider := providers.NewArtifactInputProvider(filestorageAdapter, cfg.ArtifactTTL)
return provider.NewInputProvider(filestorageBucketInputProvider, artifactInputProvider)
}
42 changes: 15 additions & 27 deletions Exesh/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (
"exesh/internal/executor"
"exesh/internal/executor/executors"
"exesh/internal/provider"
"exesh/internal/provider/providers"
"exesh/internal/provider/providers/adapter"
"exesh/internal/provider/adapter"
"exesh/internal/runtime/docker"
"exesh/internal/worker"
"fmt"
Expand Down Expand Up @@ -46,23 +45,23 @@ func main() {

mux := chi.NewRouter()

filestorage, err := filestorage.New(log, cfg.FileStorage, mux)
fs, err := filestorage.New(log, cfg.FileStorage, mux)
if err != nil {
log.Error("failed to create filestorage", slog.String("error", err.Error()))
return
}
defer filestorage.Shutdown()
defer fs.Shutdown()

filestorageAdapter := adapter.NewFilestorageAdapter(filestorage)
inputProvider := setupInputProvider(cfg.InputProvider, filestorageAdapter)
outputProvider := setupOutputProvider(cfg.OutputProvider, filestorageAdapter)
filestorageAdapter := adapter.NewFilestorageAdapter(fs)
sourceProvider := provider.NewSourceProvider(cfg.SourceProvider, filestorageAdapter)
outputProvider := provider.NewOutputProvider(cfg.OutputProvider, filestorageAdapter)

jobExecutor, err := setupJobExecutor(log, inputProvider, outputProvider)
jobExecutor, err := setupJobExecutor(log, sourceProvider, outputProvider)
if err != nil {
flog.Fatal(err)
}

worker.NewWorker(log, cfg.Worker, jobExecutor).Start(ctx)
worker.NewWorker(log, cfg.Worker, sourceProvider, jobExecutor).Start(ctx)

promRegistry := prometheus.NewRegistry()
promRegistry.MustRegister(
Expand Down Expand Up @@ -121,18 +120,7 @@ func setupLogger(env string) (log *slog.Logger, err error) {
return log, err
}

func setupInputProvider(cfg config.InputProviderConfig, filestorageAdapter *adapter.FilestorageAdapter) *provider.InputProvider {
filestorageBucketInputProvider := providers.NewFilestorageBucketInputProvider(filestorageAdapter, cfg.FilestorageBucketTTL)
artifactInputProvider := providers.NewArtifactInputProvider(filestorageAdapter, cfg.ArtifactTTL)
return provider.NewInputProvider(filestorageBucketInputProvider, artifactInputProvider)
}

func setupOutputProvider(cfg config.OutputProviderConfig, filestorageAdapter *adapter.FilestorageAdapter) *provider.OutputProvider {
artifactOutputProvider := providers.NewArtifactOutputProvider(filestorageAdapter, cfg.ArtifactTTL)
return provider.NewOutputProvider(artifactOutputProvider)
}

func setupJobExecutor(log *slog.Logger, inputProvider *provider.InputProvider, outputProvider *provider.OutputProvider) (*executor.JobExecutor, error) {
func setupJobExecutor(log *slog.Logger, sourceProvider *provider.SourceProvider, outputProvider *provider.OutputProvider) (*executor.JobExecutor, error) {
gccRT, err := docker.New(
docker.WithDefaultClient(),
docker.WithBaseImage("gcc"),
Expand All @@ -157,11 +145,11 @@ func setupJobExecutor(log *slog.Logger, inputProvider *provider.InputProvider, o
if err != nil {
return nil, fmt.Errorf("create python runtime: %w", err)
}
compileCppJobExecutor := executors.NewCompileCppJobExecutor(log, inputProvider, outputProvider, gccRT)
compileGoJobExecutor := executors.NewCompileGoJobExecutor(log, inputProvider, outputProvider, goRT)
runCppJobExecutor := executors.NewRunCppJobExecutor(log, inputProvider, outputProvider, gccRT)
runPyJobExecutor := executors.NewRunPyJobExecutor(log, inputProvider, outputProvider, pyRT)
runGoJobExecutor := executors.NewRunGoJobExecutor(log, inputProvider, outputProvider, goRT)
checkCppJobExecutor := executors.NewCheckCppJobExecutor(log, inputProvider, outputProvider, gccRT)
compileCppJobExecutor := executors.NewCompileCppJobExecutor(log, sourceProvider, outputProvider, gccRT)
compileGoJobExecutor := executors.NewCompileGoJobExecutor(log, sourceProvider, outputProvider, goRT)
runCppJobExecutor := executors.NewRunCppJobExecutor(log, sourceProvider, outputProvider, gccRT)
runPyJobExecutor := executors.NewRunPyJobExecutor(log, sourceProvider, outputProvider, pyRT)
runGoJobExecutor := executors.NewRunGoJobExecutor(log, sourceProvider, outputProvider, goRT)
checkCppJobExecutor := executors.NewCheckCppJobExecutor(log, sourceProvider, outputProvider, gccRT)
return executor.NewJobExecutor(compileCppJobExecutor, compileGoJobExecutor, runCppJobExecutor, runPyJobExecutor, runGoJobExecutor, checkCppJobExecutor), nil
}
12 changes: 5 additions & 7 deletions Exesh/config/coordinator/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,17 @@ filestorage:
workers: 1
collector_iterations_delay: 60
worker_iterations_delay: 5
input_provider:
filestorage_bucket_ttl: 15m
artifact_ttl: 5m
job_factory:
output:
compiled_cpp: bin
compiled_binary: bin
run_output: output
check_verdict: verdict
filestorage_endpoint: http://localhost:5253
source_ttl:
filestorage_bucket: 30m
filestorage_endpoint: http://coordinator:5253
execution_scheduler:
executions_interval: 5s
max_concurrency: 10
execution_retry_after: 300s
execution_retry_after: 30s
worker_pool:
worker_die_after: 10s
artifact_registry:
Expand Down
10 changes: 4 additions & 6 deletions Exesh/config/coordinator/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,17 @@ filestorage:
workers: 1
collector_iterations_delay: 60
worker_iterations_delay: 5
input_provider:
filestorage_bucket_ttl: 15m
artifact_ttl: 5m
job_factory:
output:
compiled_cpp: bin
compiled_binary: bin
run_output: output
check_verdict: verdict
source_ttl:
filestorage_bucket: 30m
filestorage_endpoint: http://coordinator:5253
execution_scheduler:
executions_interval: 3s
max_concurrency: 10
execution_retry_after: 300s
execution_retry_after: 30s
worker_pool:
worker_die_after: 10s
artifact_registry:
Expand Down
2 changes: 1 addition & 1 deletion Exesh/config/worker-1/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ filestorage:
workers: 1
collector_iterations_delay: 60
worker_iterations_delay: 5
input_provider:
source_provider:
filestorage_bucket_ttl: 15m
artifact_ttl: 5m
output_provider:
Expand Down
2 changes: 1 addition & 1 deletion Exesh/config/worker-1/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ filestorage:
workers: 1
collector_iterations_delay: 60
worker_iterations_delay: 5
input_provider:
source_provider:
filestorage_bucket_ttl: 15m
artifact_ttl: 5m
output_provider:
Expand Down
2 changes: 1 addition & 1 deletion Exesh/config/worker-2/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ filestorage:
workers: 1
collector_iterations_delay: 60
worker_iterations_delay: 5
input_provider:
source_provider:
filestorage_bucket_ttl: 15m
artifact_ttl: 5m
output_provider:
Expand Down
2 changes: 1 addition & 1 deletion Exesh/config/worker-2/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ filestorage:
workers: 1
collector_iterations_delay: 60
worker_iterations_delay: 5
input_provider:
source_provider:
filestorage_bucket_ttl: 15m
artifact_ttl: 5m
output_provider:
Expand Down
56 changes: 28 additions & 28 deletions Exesh/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ package main

import (
"context"
"exesh/internal/domain/execution"
"exesh/internal/domain/execution/inputs"
"exesh/internal/domain/execution/jobs"
"exesh/internal/domain/execution/outputs"
"exesh/internal/domain/execution/input"
"exesh/internal/domain/execution/input/inputs"
"exesh/internal/domain/execution/job"
jobs2 "exesh/internal/domain/execution/job/jobs"
"exesh/internal/executor/executors"
"exesh/internal/runtime/docker"
"fmt"
Expand All @@ -19,7 +19,7 @@ import (

type dummyInputProvider struct{}

func (dp *dummyInputProvider) Create(ctx context.Context, in execution.Input) (w io.Writer, commit, abort func() error, err error) {
func (dp *dummyInputProvider) Create(ctx context.Context, in input.Input) (w io.Writer, commit, abort func() error, err error) {
commit = func() error { return nil }
abort = func() error { return nil }
f, err := os.OpenFile(in.GetFile(), os.O_CREATE|os.O_RDWR, 0o755)
Expand All @@ -31,12 +31,12 @@ func (dp *dummyInputProvider) Create(ctx context.Context, in execution.Input) (w
return f, commit, abort, err
}

func (dp *dummyInputProvider) Locate(ctx context.Context, in execution.Input) (path string, unlock func(), err error) {
func (dp *dummyInputProvider) Locate(ctx context.Context, in input.Input) (path string, unlock func(), err error) {
unlock = func() {}
return in.GetFile(), func() {}, nil
}

func (dp *dummyInputProvider) Read(ctx context.Context, in execution.Input) (r io.Reader, unlock func(), err error) {
func (dp *dummyInputProvider) Read(ctx context.Context, in input.Input) (r io.Reader, unlock func(), err error) {
unlock = func() {}
f, err := os.OpenFile(in.GetFile(), os.O_RDONLY, 0o755)
if err != nil {
Expand All @@ -48,7 +48,7 @@ func (dp *dummyInputProvider) Read(ctx context.Context, in execution.Input) (r i

type dummyOutputProvider struct{}

func (dp *dummyOutputProvider) Create(ctx context.Context, out execution.Output) (w io.Writer, commit, abort func() error, err error) {
func (dp *dummyOutputProvider) Create(ctx context.Context, out output.Output) (w io.Writer, commit, abort func() error, err error) {
commit = func() error { return nil }
abort = func() error { return nil }
f, err := os.OpenFile(out.GetFile(), os.O_CREATE|os.O_RDWR, 0o755)
Expand All @@ -60,16 +60,16 @@ func (dp *dummyOutputProvider) Create(ctx context.Context, out execution.Output)
return f, commit, abort, err
}

func (dp *dummyOutputProvider) Locate(ctx context.Context, out execution.Output) (path string, unlock func(), err error) {
func (dp *dummyOutputProvider) Locate(ctx context.Context, out output.Output) (path string, unlock func(), err error) {
unlock = func() {}
return out.GetFile(), func() {}, nil
}

func (dp *dummyOutputProvider) Reserve(ctx context.Context, out execution.Output) (path string, unlock func() error, smth func() error, err error) {
func (dp *dummyOutputProvider) Reserve(ctx context.Context, out output.Output) (path string, unlock func() error, smth func() error, err error) {
return out.GetFile(), func() error { return nil }, func() error { return nil }, nil
}

func (dp *dummyOutputProvider) Read(ctx context.Context, out execution.Output) (r io.Reader, unlock func(), err error) {
func (dp *dummyOutputProvider) Read(ctx context.Context, out output.Output) (r io.Reader, unlock func(), err error) {
unlock = func() {}
f, err := os.OpenFile(out.GetFile(), os.O_RDONLY, 0o755)
if err != nil {
Expand All @@ -88,9 +88,9 @@ func Ref[T any](t T) *T {
}

func main() {
compileJobId := execution.JobID([]byte("1234567890123456789012345678901234567890"))
runJobId := execution.JobID([]byte("0123456789012345678901234567890123456789"))
checkJobId := execution.JobID([]byte("9012345678901234567890123456789012345678"))
compileJobId := job.JobID([]byte("1234567890123456789012345678901234567890"))
runJobId := job.JobID([]byte("0123456789012345678901234567890123456789"))
checkJobId := job.JobID([]byte("9012345678901234567890123456789012345678"))
workerID := "worker-id"
rt, err := docker.New(
docker.WithDefaultClient(),
Expand All @@ -102,32 +102,32 @@ func main() {
}
compileExecutor := executors.NewCompileCppJobExecutor(slog.Default(), &dummyInputProvider{}, &dummyOutputProvider{}, rt)

compilationResult := compileExecutor.Execute(context.Background(), Ref(jobs.NewCompileCppJob(
compilationResult := compileExecutor.Execute(context.Background(), Ref(jobs2.NewCompileCppJob(
compileJobId,
inputs.NewArtifactInput("main.cpp", compileJobId, workerID),
outputs.NewArtifactOutput("a.out", compileJobId),
inputs.NewArtifactInput("main.cpp", compileJobId),
output.NewArtifactOutput("a.out", compileJobId),
)))
fmt.Printf("compile: %#v\n", compilationResult)

checkerCompilationResult := compileExecutor.Execute(context.Background(), Ref(jobs.NewCompileCppJob(
checkerCompilationResult := compileExecutor.Execute(context.Background(), Ref(jobs2.NewCompileCppJob(
compileJobId,
inputs.NewArtifactInput("checker.cpp", compileJobId, workerID),
outputs.NewArtifactOutput("a.checker.out", compileJobId),
inputs.NewArtifactInput("checker.cpp", compileJobId),
output.NewArtifactOutput("a.checker.out", compileJobId),
)))
fmt.Printf("compile checker: %#v\n", checkerCompilationResult)

runExecutor := executors.NewRunCppJobExecutor(slog.Default(), &dummyInputProvider{}, &dummyOutputProvider{}, rt)
runResult := runExecutor.Execute(context.Background(), Ref(jobs.NewRunCppJob(
runJobId, inputs.NewArtifactInput("a.out", runJobId, workerID),
inputs.NewArtifactInput("in.txt", runJobId, workerID),
outputs.NewArtifactOutput("out.txt", runJobId), 0, 0, true)))
runResult := runExecutor.Execute(context.Background(), Ref(jobs2.NewRunCppJob(
runJobId, inputs.NewArtifactInput("a.out", runJobId),
inputs.NewArtifactInput("in.txt", runJobId),
output.NewArtifactOutput("out.txt", runJobId), 0, 0, true)))
fmt.Printf("run: %#v\n", runResult)

checkExecutor := executors.NewCheckCppJobExecutor(slog.Default(), &dummyInputProvider{}, &dummyOutputProvider{}, rt)
checkResult := checkExecutor.Execute(context.Background(), Ref(jobs.NewCheckCppJob(
runJobId, inputs.NewArtifactInput("a.checker.out", checkJobId, workerID),
inputs.NewArtifactInput("correct.txt", checkJobId, workerID),
inputs.NewArtifactInput("out.txt", checkJobId, workerID),
checkResult := checkExecutor.Execute(context.Background(), Ref(jobs2.NewCheckCppJob(
runJobId, inputs.NewArtifactInput("a.checker.out", checkJobId),
inputs.NewArtifactInput("correct.txt", checkJobId),
inputs.NewArtifactInput("out.txt", checkJobId),
)))
fmt.Printf("check: %#v\n", checkResult)
}
21 changes: 3 additions & 18 deletions Exesh/internal/api/execute/api.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,19 @@
package execute

import (
"encoding/json"
"exesh/internal/api"
"exesh/internal/domain/execution"
"exesh/internal/domain/execution/steps"
"exesh/internal/domain/execution/source/sources"
)

type (
Request struct {
Steps []execution.Step `json:"steps"`
Sources sources.Definitions `json:"sources"`
Stages execution.StageDefinitions `json:"stages"`
}

Response struct {
api.Response
ExecutionID *execution.ID `json:"execution_id,omitempty"`
}
)

func (r *Request) UnmarshalJSON(data []byte) error {
req := struct {
Steps json.RawMessage `json:"steps"`
}{}
if err := json.Unmarshal(data, &req); err != nil {
return err
}
var err error
r.Steps, err = steps.UnmarshalStepsJSON(req.Steps)
if err != nil {
return err
}
return nil
}
Loading
Loading