Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Exesh/config/worker-1/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ worker:
id: http://worker-1:5254
free_slots: 2
coordinator_endpoint: http://coordinator:5253
heartbeat_delay: 3s
worker_delay: 1s
heartbeat_delay: 100ms
worker_delay: 100ms
4 changes: 2 additions & 2 deletions Exesh/config/worker-2/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ worker:
id: http://worker-2:5255
free_slots: 2
coordinator_endpoint: http://coordinator:5253
heartbeat_delay: 3s
worker_delay: 1s
heartbeat_delay: 100ms
worker_delay: 100ms
36 changes: 28 additions & 8 deletions Exesh/internal/domain/execution/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type graph struct {
succJobs map[job.ID][]jobs.Job
doneJobDeps map[job.ID]int

isActive map[StageName]bool
activeStages []*Stage
toPick map[StageName][]jobs.Job

Expand All @@ -36,6 +37,7 @@ func newGraph(stages []*Stage) *graph {
succJobs: make(map[job.ID][]jobs.Job),
doneJobDeps: make(map[job.ID]int),

isActive: make(map[StageName]bool),
activeStages: make([]*Stage, 0),
toPick: make(map[StageName][]jobs.Job),

Expand Down Expand Up @@ -77,6 +79,7 @@ func newGraph(stages []*Stage) *graph {
g.doneJobs[stage.Name] = 0

if len(stage.Deps) == 0 {
g.isActive[stage.Name] = true
g.activeStages = append(g.activeStages, stage)
}
}
Expand Down Expand Up @@ -117,6 +120,9 @@ func (g *graph) doneJob(jobID job.ID, jobStatus job.Status) {
}

if jobStatus != jb.GetSuccessStatus() {
for _, activeStage := range g.activeStages {
g.isActive[activeStage.Name] = false
}
g.activeStages = make([]*Stage, 0)
return
}
Expand All @@ -130,22 +136,32 @@ func (g *graph) doneJob(jobID job.ID, jobStatus job.Status) {
}
}

if g.doneJobs[stage.Name] == g.totalJobs[stage.Name] {
activeStages := make([]*Stage, 0)
for i := range g.activeStages {
if g.activeStages[i].Name != stage.Name {
activeStages = append(activeStages, g.activeStages[i])
doneJobs := g.doneJobs[stage.Name]
totalJobs := g.totalJobs[stage.Name]
isFinished := doneJobs == totalJobs
if isFinished {
g.isActive[stage.Name] = false
activeStages := make([]*Stage, 0, len(g.activeStages)-1)
for _, activeStage := range g.activeStages {
if activeStage.Name != stage.Name {
activeStages = append(activeStages, activeStage)
}
}
g.activeStages = activeStages
}

if g.canStartSuccStages(doneJobs, totalJobs) {
for _, succStage := range g.succStages[stage.Name] {
if g.isActive[succStage.Name] {
continue
}

g.doneStageDeps[succStage.Name]++
if g.doneStageDeps[succStage.Name] == len(succStage.Deps) {
activeStages = append(activeStages, succStage)
g.isActive[succStage.Name] = true
g.activeStages = append(g.activeStages, succStage)
}
}

g.activeStages = activeStages
}
}

Expand All @@ -155,3 +171,7 @@ func (g *graph) isDone() bool {

return len(g.activeStages) == 0
}

func (g *graph) canStartSuccStages(doneJobs int, totalJobs int) bool {
return 4*doneJobs >= 3*totalJobs
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

type CompileJobMessage struct {
message.Details
JobName job.DefinitionName `json:"job"`
CompileStatus job.Status `json:"status"`
Error string `json:"error,omitempty"`
JobName job.DefinitionName `json:"job"`
CompileStatus job.Status `json:"status"`
CompilationError *string `json:"compilation_error,omitempty"`
}

func NewCompileJobMessageOk(
Expand All @@ -32,17 +32,17 @@ func NewCompileJobMessageOk(
func NewCompileJobMessageError(
executionID execution.ID,
jobName job.DefinitionName,
err string,
compilationError string,
) Message {
return Message{
&CompileJobMessage{
Details: message.Details{
ExecutionID: executionID,
Type: message.CompileJob,
},
JobName: jobName,
CompileStatus: job.StatusCE,
Error: err,
JobName: jobName,
CompileStatus: job.StatusCE,
CompilationError: &compilationError,
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

type CompileResult struct {
result.Details
CompilationError string `json:"compilation_error"`
CompilationError *string `json:"compilation_error,omitempty"`
}

func NewCompileResultOK(jobID job.ID) Result {
Expand All @@ -33,7 +33,7 @@ func NewCompileResultCE(jobID job.ID, compilationError string) Result {
Status: job.StatusCE,
DoneAt: time.Now(),
},
CompilationError: compilationError,
CompilationError: &compilationError,
},
}
}
Expand Down
6 changes: 5 additions & 1 deletion Exesh/internal/factory/message_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ func (f *MessageFactory) CreateForJob(
case job.StatusOK:
msg = messages.NewCompileJobMessageOk(executionID, jobName)
case job.StatusCE:
msg = messages.NewCompileJobMessageError(executionID, jobName, typedRes.CompilationError)
if typedRes.CompilationError != nil {
msg = messages.NewCompileJobMessageError(executionID, jobName, *typedRes.CompilationError)
} else {
msg = messages.NewCompileJobMessageError(executionID, jobName, "unknown error")
}
default:
return msg, fmt.Errorf("unknown compile status: %s", typedRes.Status)
}
Expand Down
9 changes: 6 additions & 3 deletions Exesh/internal/scheduler/execution_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"exesh/internal/domain/execution/result/results"
"exesh/internal/domain/execution/source/sources"
"fmt"
"github.com/DIvanCode/filestorage/pkg/bucket"
"log/slog"
"sync/atomic"
"time"

"github.com/DIvanCode/filestorage/pkg/bucket"

"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -141,7 +142,10 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) {
continue
}

s.log.Debug("begin execution scheduler loop", slog.Int("now_executions", s.getNowExecutions()))
nowExecutions := s.getNowExecutions()
if nowExecutions > 0 {
s.log.Debug("begin execution scheduler loop", slog.Int("now_executions", s.getNowExecutions()))
}

s.changeNowExecutions(+1)
if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error {
Expand All @@ -151,7 +155,6 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) {
}
if def == nil {
s.changeNowExecutions(-1)
s.log.Debug("no executions to schedule")
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions Exesh/internal/sender/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ func (s *KafkaSender) process(ctx context.Context) error {
}

s.log.Debug("send to kafka", slog.Int64("outbox_id", ox.ID))
err = s.writer.WriteMessages(ctx, message)
if err != nil {
if err = s.writer.WriteMessages(ctx, message); err != nil {
failedAt := time.Now()
ox.FailedAt = &failedAt
ox.FailedTries++
Expand Down
3 changes: 0 additions & 3 deletions Exesh/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ func (w *Worker) runHeartbeat(ctx context.Context) {
continue
}

w.log.Debug("begin heartbeat loop")

w.mu.Lock()

doneJobs := make([]results.Result, len(w.doneJobs))
Expand Down Expand Up @@ -137,7 +135,6 @@ func (w *Worker) runWorker(ctx context.Context) {

job := w.jobs.Dequeue()
if job == nil {
w.log.Debug("skip worker loop (no jobs to do)")
w.changeFreeSlots(+1)
continue
}
Expand Down
Loading