From 77d25f8c7b486225eb04aaab7ea48a847b2c5d01 Mon Sep 17 00:00:00 2001 From: divancode Date: Mon, 16 Feb 2026 10:38:37 +0300 Subject: [PATCH 1/2] chore(exesh): start succ stage earlier --- Exesh/config/worker-1/docker.yml | 4 +-- Exesh/config/worker-2/docker.yml | 4 +-- Exesh/internal/domain/execution/graph.go | 36 ++++++++++++++----- .../internal/scheduler/execution_scheduler.go | 9 +++-- Exesh/internal/worker/worker.go | 3 -- 5 files changed, 38 insertions(+), 18 deletions(-) diff --git a/Exesh/config/worker-1/docker.yml b/Exesh/config/worker-1/docker.yml index 4ac83391..1e4bf214 100644 --- a/Exesh/config/worker-1/docker.yml +++ b/Exesh/config/worker-1/docker.yml @@ -17,5 +17,5 @@ worker: id: http://worker-1:5254 free_slots: 2 coordinator_endpoint: http://coordinator:5253 - heartbeat_delay: 3s - worker_delay: 1s + heartbeat_delay: 100ms + worker_delay: 100ms diff --git a/Exesh/config/worker-2/docker.yml b/Exesh/config/worker-2/docker.yml index 5e3b4397..496ea9aa 100644 --- a/Exesh/config/worker-2/docker.yml +++ b/Exesh/config/worker-2/docker.yml @@ -17,5 +17,5 @@ worker: id: http://worker-2:5255 free_slots: 2 coordinator_endpoint: http://coordinator:5253 - heartbeat_delay: 3s - worker_delay: 1s + heartbeat_delay: 100ms + worker_delay: 100ms diff --git a/Exesh/internal/domain/execution/graph.go b/Exesh/internal/domain/execution/graph.go index 5ce25395..1a2176db 100644 --- a/Exesh/internal/domain/execution/graph.go +++ b/Exesh/internal/domain/execution/graph.go @@ -17,6 +17,7 @@ type graph struct { succJobs map[job.ID][]jobs.Job doneJobDeps map[job.ID]int + isActive map[StageName]bool activeStages []*Stage toPick map[StageName][]jobs.Job @@ -36,6 +37,7 @@ func newGraph(stages []*Stage) *graph { succJobs: make(map[job.ID][]jobs.Job), doneJobDeps: make(map[job.ID]int), + isActive: make(map[StageName]bool), activeStages: make([]*Stage, 0), toPick: make(map[StageName][]jobs.Job), @@ -77,6 +79,7 @@ func newGraph(stages []*Stage) *graph { g.doneJobs[stage.Name] = 0 if len(stage.Deps) == 0 { + g.isActive[stage.Name] = true g.activeStages = append(g.activeStages, stage) } } @@ -117,6 +120,9 @@ func (g *graph) doneJob(jobID job.ID, jobStatus job.Status) { } if jobStatus != jb.GetSuccessStatus() { + for _, activeStage := range g.activeStages { + g.isActive[activeStage.Name] = false + } g.activeStages = make([]*Stage, 0) return } @@ -130,22 +136,32 @@ func (g *graph) doneJob(jobID job.ID, jobStatus job.Status) { } } - if g.doneJobs[stage.Name] == g.totalJobs[stage.Name] { - activeStages := make([]*Stage, 0) - for i := range g.activeStages { - if g.activeStages[i].Name != stage.Name { - activeStages = append(activeStages, g.activeStages[i]) + doneJobs := g.doneJobs[stage.Name] + totalJobs := g.totalJobs[stage.Name] + isFinished := doneJobs == totalJobs + if isFinished { + g.isActive[stage.Name] = false + activeStages := make([]*Stage, 0, len(g.activeStages)-1) + for _, activeStage := range g.activeStages { + if activeStage.Name != stage.Name { + activeStages = append(activeStages, activeStage) } } + g.activeStages = activeStages + } + if g.canStartSuccStages(doneJobs, totalJobs) { for _, succStage := range g.succStages[stage.Name] { + if g.isActive[succStage.Name] { + continue + } + g.doneStageDeps[succStage.Name]++ if g.doneStageDeps[succStage.Name] == len(succStage.Deps) { - activeStages = append(activeStages, succStage) + g.isActive[succStage.Name] = true + g.activeStages = append(g.activeStages, succStage) } } - - g.activeStages = activeStages } } @@ -155,3 +171,7 @@ func (g *graph) isDone() bool { return len(g.activeStages) == 0 } + +func (g *graph) canStartSuccStages(doneJobs int, totalJobs int) bool { + return 4*doneJobs >= 3*totalJobs +} diff --git a/Exesh/internal/scheduler/execution_scheduler.go b/Exesh/internal/scheduler/execution_scheduler.go index 762eae63..63b8134e 100644 --- a/Exesh/internal/scheduler/execution_scheduler.go +++ b/Exesh/internal/scheduler/execution_scheduler.go @@ -12,11 +12,12 @@ import ( "exesh/internal/domain/execution/result/results" "exesh/internal/domain/execution/source/sources" "fmt" - "github.com/DIvanCode/filestorage/pkg/bucket" "log/slog" "sync/atomic" "time" + "github.com/DIvanCode/filestorage/pkg/bucket" + "github.com/prometheus/client_golang/prometheus" ) @@ -141,7 +142,10 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) { continue } - s.log.Debug("begin execution scheduler loop", slog.Int("now_executions", s.getNowExecutions())) + nowExecutions := s.getNowExecutions() + if nowExecutions > 0 { + s.log.Debug("begin execution scheduler loop", slog.Int("now_executions", s.getNowExecutions())) + } s.changeNowExecutions(+1) if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error { @@ -151,7 +155,6 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) { } if def == nil { s.changeNowExecutions(-1) - s.log.Debug("no executions to schedule") return nil } diff --git a/Exesh/internal/worker/worker.go b/Exesh/internal/worker/worker.go index c65bb01f..d2a8045d 100644 --- a/Exesh/internal/worker/worker.go +++ b/Exesh/internal/worker/worker.go @@ -86,8 +86,6 @@ func (w *Worker) runHeartbeat(ctx context.Context) { continue } - w.log.Debug("begin heartbeat loop") - w.mu.Lock() doneJobs := make([]results.Result, len(w.doneJobs)) @@ -137,7 +135,6 @@ func (w *Worker) runWorker(ctx context.Context) { job := w.jobs.Dequeue() if job == nil { - w.log.Debug("skip worker loop (no jobs to do)") w.changeFreeSlots(+1) continue } From 687929dd629afea270c464ecf57cab63bd9d264b Mon Sep 17 00:00:00 2001 From: Ivan Dobrynin Date: Tue, 24 Feb 2026 13:05:03 +0300 Subject: [PATCH 2/2] fix contracts --- .../message/messages/compile_job_message.go | 14 +++++++------- .../execution/result/results/compile_result.go | 4 ++-- Exesh/internal/factory/message_factory.go | 6 +++++- Exesh/internal/sender/message_sender.go | 3 +-- 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/Exesh/internal/domain/execution/message/messages/compile_job_message.go b/Exesh/internal/domain/execution/message/messages/compile_job_message.go index 3205c1c7..dd7c5924 100644 --- a/Exesh/internal/domain/execution/message/messages/compile_job_message.go +++ b/Exesh/internal/domain/execution/message/messages/compile_job_message.go @@ -8,9 +8,9 @@ import ( type CompileJobMessage struct { message.Details - JobName job.DefinitionName `json:"job"` - CompileStatus job.Status `json:"status"` - Error string `json:"error,omitempty"` + JobName job.DefinitionName `json:"job"` + CompileStatus job.Status `json:"status"` + CompilationError *string `json:"compilation_error,omitempty"` } func NewCompileJobMessageOk( @@ -32,7 +32,7 @@ func NewCompileJobMessageOk( func NewCompileJobMessageError( executionID execution.ID, jobName job.DefinitionName, - err string, + compilationError string, ) Message { return Message{ &CompileJobMessage{ @@ -40,9 +40,9 @@ func NewCompileJobMessageError( ExecutionID: executionID, Type: message.CompileJob, }, - JobName: jobName, - CompileStatus: job.StatusCE, - Error: err, + JobName: jobName, + CompileStatus: job.StatusCE, + CompilationError: &compilationError, }, } } diff --git a/Exesh/internal/domain/execution/result/results/compile_result.go b/Exesh/internal/domain/execution/result/results/compile_result.go index cd253221..ab6b5d7c 100644 --- a/Exesh/internal/domain/execution/result/results/compile_result.go +++ b/Exesh/internal/domain/execution/result/results/compile_result.go @@ -8,7 +8,7 @@ import ( type CompileResult struct { result.Details - CompilationError string `json:"compilation_error"` + CompilationError *string `json:"compilation_error,omitempty"` } func NewCompileResultOK(jobID job.ID) Result { @@ -33,7 +33,7 @@ func NewCompileResultCE(jobID job.ID, compilationError string) Result { Status: job.StatusCE, DoneAt: time.Now(), }, - CompilationError: compilationError, + CompilationError: &compilationError, }, } } diff --git a/Exesh/internal/factory/message_factory.go b/Exesh/internal/factory/message_factory.go index 7488d54e..1325740e 100644 --- a/Exesh/internal/factory/message_factory.go +++ b/Exesh/internal/factory/message_factory.go @@ -33,7 +33,11 @@ func (f *MessageFactory) CreateForJob( case job.StatusOK: msg = messages.NewCompileJobMessageOk(executionID, jobName) case job.StatusCE: - msg = messages.NewCompileJobMessageError(executionID, jobName, typedRes.CompilationError) + if typedRes.CompilationError != nil { + msg = messages.NewCompileJobMessageError(executionID, jobName, *typedRes.CompilationError) + } else { + msg = messages.NewCompileJobMessageError(executionID, jobName, "unknown error") + } default: return msg, fmt.Errorf("unknown compile status: %s", typedRes.Status) } diff --git a/Exesh/internal/sender/message_sender.go b/Exesh/internal/sender/message_sender.go index 629e565c..57cfdb25 100644 --- a/Exesh/internal/sender/message_sender.go +++ b/Exesh/internal/sender/message_sender.go @@ -134,8 +134,7 @@ func (s *KafkaSender) process(ctx context.Context) error { } s.log.Debug("send to kafka", slog.Int64("outbox_id", ox.ID)) - err = s.writer.WriteMessages(ctx, message) - if err != nil { + if err = s.writer.WriteMessages(ctx, message); err != nil { failedAt := time.Now() ox.FailedAt = &failedAt ox.FailedTries++