From a3fdf7e8541e5f03267ee5dc7a60eeced9b22444 Mon Sep 17 00:00:00 2001 From: divancode Date: Sun, 25 Jan 2026 13:28:13 +0300 Subject: [PATCH] feat(exesh): in memory outbox message sender --- Exesh/cmd/coordinator/main.go | 1 + Exesh/internal/factory/message_factory.go | 12 +-- Exesh/internal/pool/worker_pool.go | 13 ++- .../internal/scheduler/execution_scheduler.go | 36 +++---- Exesh/internal/sender/message_sender.go | 95 +++++++++++++++---- 5 files changed, 106 insertions(+), 51 deletions(-) diff --git a/Exesh/cmd/coordinator/main.go b/Exesh/cmd/coordinator/main.go index 2a96d738..f5847321 100644 --- a/Exesh/cmd/coordinator/main.go +++ b/Exesh/cmd/coordinator/main.go @@ -76,6 +76,7 @@ func main() { messageFactory := factory.NewMessageFactory(log) messageSender := sender.NewKafkaSender(log, cfg.Sender) + messageSender.Start(ctx) promRegistry := prometheus.NewRegistry() promRegistry.MustRegister( diff --git a/Exesh/internal/factory/message_factory.go b/Exesh/internal/factory/message_factory.go index 586105e9..dbb2b74a 100644 --- a/Exesh/internal/factory/message_factory.go +++ b/Exesh/internal/factory/message_factory.go @@ -18,8 +18,8 @@ func NewMessageFactory(log *slog.Logger) *MessageFactory { } } -func (f *MessageFactory) CreateExecutionStarted(execCtx *execution.Context) (execution.Message, error) { - return messages.NewStartExecutionMessage(execCtx.ExecutionID), nil +func (f *MessageFactory) CreateExecutionStarted(execCtx *execution.Context) execution.Message { + return messages.NewStartExecutionMessage(execCtx.ExecutionID) } func (f *MessageFactory) CreateForStep(execCtx *execution.Context, step execution.Step, result execution.Result) (execution.Message, error) { @@ -48,10 +48,10 @@ func (f *MessageFactory) CreateForStep(execCtx *execution.Context, step executio } } -func (f *MessageFactory) CreateExecutionFinished(execCtx *execution.Context) (execution.Message, error) { - return messages.NewFinishExecutionMessage(execCtx.ExecutionID), nil +func (f *MessageFactory) CreateExecutionFinished(execCtx *execution.Context) execution.Message { + return messages.NewFinishExecutionMessage(execCtx.ExecutionID) } -func (f *MessageFactory) CreateExecutionFinishedError(execCtx *execution.Context, err string) (execution.Message, error) { - return messages.NewFinishExecutionMessageError(execCtx.ExecutionID, err), nil +func (f *MessageFactory) CreateExecutionFinishedError(execCtx *execution.Context, err string) execution.Message { + return messages.NewFinishExecutionMessageError(execCtx.ExecutionID, err) } diff --git a/Exesh/internal/pool/worker_pool.go b/Exesh/internal/pool/worker_pool.go index 1f896528..2d1bdf9e 100644 --- a/Exesh/internal/pool/worker_pool.go +++ b/Exesh/internal/pool/worker_pool.go @@ -23,9 +23,9 @@ func NewWorkerPool(log *slog.Logger, cfg config.WorkerPoolConfig) *WorkerPool { log: log, cfg: cfg, - mu: sync.Mutex{}, + mu: sync.Mutex{}, heartbeats: make(map[string]chan any), - stop: false, + stop: false, } } @@ -70,11 +70,13 @@ func (p *WorkerPool) createWorker(workerID string) { func (p *WorkerPool) runObserver(workerID string) { for { p.mu.Lock() + stop := p.stop heartbeat, ok := p.heartbeats[workerID] - if p.stop || !ok { + p.mu.Unlock() + + if stop || !ok { break } - p.mu.Unlock() timer := time.NewTicker(p.cfg.WorkerDieAfter) @@ -88,5 +90,8 @@ func (p *WorkerPool) runObserver(workerID string) { } func (p *WorkerPool) deleteWorker(workerID string) { + p.mu.Lock() + defer p.mu.Unlock() + delete(p.heartbeats, workerID) } diff --git a/Exesh/internal/scheduler/execution_scheduler.go b/Exesh/internal/scheduler/execution_scheduler.go index 5486c62a..caa70d53 100644 --- a/Exesh/internal/scheduler/execution_scheduler.go +++ b/Exesh/internal/scheduler/execution_scheduler.go @@ -51,14 +51,14 @@ type ( } messageFactory interface { - CreateExecutionStarted(*execution.Context) (execution.Message, error) + CreateExecutionStarted(*execution.Context) execution.Message CreateForStep(*execution.Context, execution.Step, execution.Result) (execution.Message, error) - CreateExecutionFinished(*execution.Context) (execution.Message, error) - CreateExecutionFinishedError(*execution.Context, string) (execution.Message, error) + CreateExecutionFinished(*execution.Context) execution.Message + CreateExecutionFinishedError(*execution.Context, string) execution.Message } messageSender interface { - Send(context.Context, execution.Message) error + Send(execution.Message) } ) @@ -172,16 +172,11 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) error { func (s *ExecutionScheduler) scheduleExecution(ctx context.Context, execCtx *execution.Context) error { s.log.Info("schedule execution", slog.String("execution_id", execCtx.ExecutionID.String())) - msg, err := s.messageFactory.CreateExecutionStarted(execCtx) - if err != nil { - return fmt.Errorf("failed to create execution started message: %w", err) - } - if err = s.messageSender.Send(ctx, msg); err != nil { - return fmt.Errorf("failed to send %s message: %w", msg.GetType(), err) - } + msg := s.messageFactory.CreateExecutionStarted(execCtx) + s.messageSender.Send(msg) for _, step := range execCtx.PickSteps() { - if err = s.scheduleStep(ctx, execCtx, step); err != nil { + if err := s.scheduleStep(ctx, execCtx, step); err != nil { return err } } @@ -266,9 +261,8 @@ func (s *ExecutionScheduler) doneStep( if err != nil { return fmt.Errorf("failed to create message for step: %w", err) } - if err = s.messageSender.Send(ctx, msg); err != nil { - return fmt.Errorf("failed to send message for step: %w", err) - } + + s.messageSender.Send(msg) execCtx.DoneStep(step.GetName()) @@ -342,17 +336,13 @@ func (s *ExecutionScheduler) finishExecution( var msg execution.Message if execError == nil { - msg, err = s.messageFactory.CreateExecutionFinished(execCtx) + msg = s.messageFactory.CreateExecutionFinished(execCtx) } else { - msg, err = s.messageFactory.CreateExecutionFinishedError(execCtx, execError.Error()) - } - if err != nil { - return fmt.Errorf("failed to create execution finished message: %w", err) - } - if err = s.messageSender.Send(ctx, msg); err != nil { - return fmt.Errorf("failed to send execution finished message: %w", err) + msg = s.messageFactory.CreateExecutionFinishedError(execCtx, execError.Error()) } + s.messageSender.Send(msg) + e.SetFinished(time.Now()) if err = s.executionStorage.Save(ctx, *e); err != nil { diff --git a/Exesh/internal/sender/message_sender.go b/Exesh/internal/sender/message_sender.go index aa37b124..8a8d6a4c 100644 --- a/Exesh/internal/sender/message_sender.go +++ b/Exesh/internal/sender/message_sender.go @@ -5,14 +5,21 @@ import ( "encoding/json" "exesh/internal/config" "exesh/internal/domain/execution" - "fmt" + "exesh/internal/lib/queue" "log/slog" + "math" + "sync" + "time" "github.com/segmentio/kafka-go" ) type KafkaSender struct { - log *slog.Logger + log *slog.Logger + + messages queue.Queue[execution.Message] + mu sync.Mutex + writer *kafka.Writer } @@ -24,27 +31,79 @@ func NewKafkaSender(log *slog.Logger, cfg config.SenderConfig) *KafkaSender { BatchSize: 1, } return &KafkaSender{ - log: log, + log: log, + + messages: *queue.NewQueue[execution.Message](), + mu: sync.Mutex{}, + writer: writer, } } -func (s *KafkaSender) Send(ctx context.Context, msg execution.Message) error { - value, err := json.Marshal(msg) - if err != nil { - return fmt.Errorf("failed to marshal message: %w", err) - } +func (s *KafkaSender) Start(ctx context.Context) { + go s.run(ctx) +} - kafkaMsg := kafka.Message{ - Key: []byte(msg.GetExecutionID().String()), - Value: value, - } +func (s *KafkaSender) Send(msg execution.Message) { + s.mu.Lock() + defer s.mu.Unlock() - s.log.Info("sending message to kafka", slog.Any("msg_type", msg.GetType())) - if err = s.writer.WriteMessages(ctx, kafkaMsg); err != nil { - return fmt.Errorf("failed to send message to kafka: %w", err) - } - s.log.Info("sending ok") + s.messages.Enqueue(msg) +} + +func (s *KafkaSender) run(ctx context.Context) { + consequentFails := 0 - return nil + for { + waitTime := time.Duration(100 * math.Pow(2, float64(min(consequentFails, 6)))) + timer := time.NewTicker(waitTime * time.Millisecond) + + select { + case <-ctx.Done(): + return + case <-timer.C: + break + } + + ok := true + for { + s.mu.Lock() + msg := s.messages.Peek() + s.mu.Unlock() + + if msg == nil { + break + } + + value, err := json.Marshal(*msg) + if err != nil { + s.mu.Lock() + s.messages.Dequeue() + s.mu.Unlock() + continue + } + + kafkaMsg := kafka.Message{ + Key: []byte((*msg).GetExecutionID().String()), + Value: value, + } + + s.log.Debug("send to kafka", slog.Any("type", (*msg).GetType())) + if err = s.writer.WriteMessages(ctx, kafkaMsg); err != nil { + s.log.Error("failed to send message to kafka", slog.Any("error", err)) + ok = false + break + } + + s.mu.Lock() + s.messages.Dequeue() + s.mu.Unlock() + } + + if ok { + consequentFails = 0 + } else { + consequentFails++ + } + } }