Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Exesh/cmd/coordinator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func main() {

messageFactory := factory.NewMessageFactory(log)
messageSender := sender.NewKafkaSender(log, cfg.Sender)
messageSender.Start(ctx)

promRegistry := prometheus.NewRegistry()
promRegistry.MustRegister(
Expand Down
12 changes: 6 additions & 6 deletions Exesh/internal/factory/message_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func NewMessageFactory(log *slog.Logger) *MessageFactory {
}
}

func (f *MessageFactory) CreateExecutionStarted(execCtx *execution.Context) (execution.Message, error) {
return messages.NewStartExecutionMessage(execCtx.ExecutionID), nil
func (f *MessageFactory) CreateExecutionStarted(execCtx *execution.Context) execution.Message {
return messages.NewStartExecutionMessage(execCtx.ExecutionID)
}

func (f *MessageFactory) CreateForStep(execCtx *execution.Context, step execution.Step, result execution.Result) (execution.Message, error) {
Expand Down Expand Up @@ -48,10 +48,10 @@ func (f *MessageFactory) CreateForStep(execCtx *execution.Context, step executio
}
}

func (f *MessageFactory) CreateExecutionFinished(execCtx *execution.Context) (execution.Message, error) {
return messages.NewFinishExecutionMessage(execCtx.ExecutionID), nil
func (f *MessageFactory) CreateExecutionFinished(execCtx *execution.Context) execution.Message {
return messages.NewFinishExecutionMessage(execCtx.ExecutionID)
}

func (f *MessageFactory) CreateExecutionFinishedError(execCtx *execution.Context, err string) (execution.Message, error) {
return messages.NewFinishExecutionMessageError(execCtx.ExecutionID, err), nil
func (f *MessageFactory) CreateExecutionFinishedError(execCtx *execution.Context, err string) execution.Message {
return messages.NewFinishExecutionMessageError(execCtx.ExecutionID, err)
}
13 changes: 9 additions & 4 deletions Exesh/internal/pool/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func NewWorkerPool(log *slog.Logger, cfg config.WorkerPoolConfig) *WorkerPool {
log: log,
cfg: cfg,

mu: sync.Mutex{},
mu: sync.Mutex{},
heartbeats: make(map[string]chan any),
stop: false,
stop: false,
}
}

Expand Down Expand Up @@ -70,11 +70,13 @@ func (p *WorkerPool) createWorker(workerID string) {
func (p *WorkerPool) runObserver(workerID string) {
for {
p.mu.Lock()
stop := p.stop
heartbeat, ok := p.heartbeats[workerID]
if p.stop || !ok {
p.mu.Unlock()

if stop || !ok {
break
}
p.mu.Unlock()

timer := time.NewTicker(p.cfg.WorkerDieAfter)

Expand All @@ -88,5 +90,8 @@ func (p *WorkerPool) runObserver(workerID string) {
}

func (p *WorkerPool) deleteWorker(workerID string) {
p.mu.Lock()
defer p.mu.Unlock()

delete(p.heartbeats, workerID)
}
36 changes: 13 additions & 23 deletions Exesh/internal/scheduler/execution_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ type (
}

messageFactory interface {
CreateExecutionStarted(*execution.Context) (execution.Message, error)
CreateExecutionStarted(*execution.Context) execution.Message
CreateForStep(*execution.Context, execution.Step, execution.Result) (execution.Message, error)
CreateExecutionFinished(*execution.Context) (execution.Message, error)
CreateExecutionFinishedError(*execution.Context, string) (execution.Message, error)
CreateExecutionFinished(*execution.Context) execution.Message
CreateExecutionFinishedError(*execution.Context, string) execution.Message
}

messageSender interface {
Send(context.Context, execution.Message) error
Send(execution.Message)
}
)

Expand Down Expand Up @@ -172,16 +172,11 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) error {
func (s *ExecutionScheduler) scheduleExecution(ctx context.Context, execCtx *execution.Context) error {
s.log.Info("schedule execution", slog.String("execution_id", execCtx.ExecutionID.String()))

msg, err := s.messageFactory.CreateExecutionStarted(execCtx)
if err != nil {
return fmt.Errorf("failed to create execution started message: %w", err)
}
if err = s.messageSender.Send(ctx, msg); err != nil {
return fmt.Errorf("failed to send %s message: %w", msg.GetType(), err)
}
msg := s.messageFactory.CreateExecutionStarted(execCtx)
s.messageSender.Send(msg)

for _, step := range execCtx.PickSteps() {
if err = s.scheduleStep(ctx, execCtx, step); err != nil {
if err := s.scheduleStep(ctx, execCtx, step); err != nil {
return err
}
}
Expand Down Expand Up @@ -266,9 +261,8 @@ func (s *ExecutionScheduler) doneStep(
if err != nil {
return fmt.Errorf("failed to create message for step: %w", err)
}
if err = s.messageSender.Send(ctx, msg); err != nil {
return fmt.Errorf("failed to send message for step: %w", err)
}

s.messageSender.Send(msg)

execCtx.DoneStep(step.GetName())

Expand Down Expand Up @@ -342,17 +336,13 @@ func (s *ExecutionScheduler) finishExecution(

var msg execution.Message
if execError == nil {
msg, err = s.messageFactory.CreateExecutionFinished(execCtx)
msg = s.messageFactory.CreateExecutionFinished(execCtx)
} else {
msg, err = s.messageFactory.CreateExecutionFinishedError(execCtx, execError.Error())
}
if err != nil {
return fmt.Errorf("failed to create execution finished message: %w", err)
}
if err = s.messageSender.Send(ctx, msg); err != nil {
return fmt.Errorf("failed to send execution finished message: %w", err)
msg = s.messageFactory.CreateExecutionFinishedError(execCtx, execError.Error())
}

s.messageSender.Send(msg)

e.SetFinished(time.Now())

if err = s.executionStorage.Save(ctx, *e); err != nil {
Expand Down
95 changes: 77 additions & 18 deletions Exesh/internal/sender/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@ import (
"encoding/json"
"exesh/internal/config"
"exesh/internal/domain/execution"
"fmt"
"exesh/internal/lib/queue"
"log/slog"
"math"
"sync"
"time"

"github.com/segmentio/kafka-go"
)

type KafkaSender struct {
log *slog.Logger
log *slog.Logger

messages queue.Queue[execution.Message]
mu sync.Mutex

writer *kafka.Writer
}

Expand All @@ -24,27 +31,79 @@ func NewKafkaSender(log *slog.Logger, cfg config.SenderConfig) *KafkaSender {
BatchSize: 1,
}
return &KafkaSender{
log: log,
log: log,

messages: *queue.NewQueue[execution.Message](),
mu: sync.Mutex{},

writer: writer,
}
}

func (s *KafkaSender) Send(ctx context.Context, msg execution.Message) error {
value, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
func (s *KafkaSender) Start(ctx context.Context) {
go s.run(ctx)
}

kafkaMsg := kafka.Message{
Key: []byte(msg.GetExecutionID().String()),
Value: value,
}
func (s *KafkaSender) Send(msg execution.Message) {
s.mu.Lock()
defer s.mu.Unlock()

s.log.Info("sending message to kafka", slog.Any("msg_type", msg.GetType()))
if err = s.writer.WriteMessages(ctx, kafkaMsg); err != nil {
return fmt.Errorf("failed to send message to kafka: %w", err)
}
s.log.Info("sending ok")
s.messages.Enqueue(msg)
}

func (s *KafkaSender) run(ctx context.Context) {
consequentFails := 0

return nil
for {
waitTime := time.Duration(100 * math.Pow(2, float64(min(consequentFails, 6))))
timer := time.NewTicker(waitTime * time.Millisecond)

select {
case <-ctx.Done():
return
case <-timer.C:
break
}

ok := true
for {
s.mu.Lock()
msg := s.messages.Peek()
s.mu.Unlock()

if msg == nil {
break
}

value, err := json.Marshal(*msg)
if err != nil {
s.mu.Lock()
s.messages.Dequeue()
s.mu.Unlock()
continue
}

kafkaMsg := kafka.Message{
Key: []byte((*msg).GetExecutionID().String()),
Value: value,
}

s.log.Debug("send to kafka", slog.Any("type", (*msg).GetType()))
if err = s.writer.WriteMessages(ctx, kafkaMsg); err != nil {
s.log.Error("failed to send message to kafka", slog.Any("error", err))
ok = false
break
}

s.mu.Lock()
s.messages.Dequeue()
s.mu.Unlock()
}

if ok {
consequentFails = 0
} else {
consequentFails++
}
}
}
Loading