Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions Exesh/cmd/coordinator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func main() {

mux := chi.NewRouter()

unitOfWork, executionStorage, err := setupStorage(log, cfg.Storage)
unitOfWork, executionStorage, outboxStorage, err := setupStorage(log, cfg.Storage)
if err != nil {
log.Error("failed to setup storage", slog.String("error", err.Error()))
return
Expand All @@ -75,7 +75,7 @@ func main() {
jobFactory := factory.NewJobFactory(log, cfg.JobFactory, artifactRegistry, inputProvider)

messageFactory := factory.NewMessageFactory(log)
messageSender := sender.NewKafkaSender(log, cfg.Sender)
messageSender := sender.NewKafkaSender(log, cfg.Sender, unitOfWork, outboxStorage)
messageSender.Start(ctx)

promRegistry := prometheus.NewRegistry()
Expand Down Expand Up @@ -157,6 +157,7 @@ func setupLogger(env string) (log *slog.Logger, err error) {
func setupStorage(log *slog.Logger, cfg config.StorageConfig) (
unitOfWork *postgres.UnitOfWork,
executionStorage *postgres.ExecutionStorage,
outboxStorage *postgres.OutboxStorage,
err error,
) {
ctx, cancel := context.WithTimeout(context.Background(), cfg.InitTimeout)
Expand All @@ -165,17 +166,20 @@ func setupStorage(log *slog.Logger, cfg config.StorageConfig) (
unitOfWork, err = postgres.NewUnitOfWork(cfg)
if err != nil {
err = fmt.Errorf("failed to create unit of work: %w", err)
return unitOfWork, executionStorage, err
return unitOfWork, executionStorage, outboxStorage, err
}

err = unitOfWork.Do(ctx, func(ctx context.Context) error {
if executionStorage, err = postgres.NewExecutionStorage(ctx, log); err != nil {
return fmt.Errorf("failed to create execution storage: %w", err)
}
if outboxStorage, err = postgres.NewOutboxStorage(ctx, log); err != nil {
return fmt.Errorf("failed to create outbox storage: %w", err)
}
return nil
})

return unitOfWork, executionStorage, err
return unitOfWork, executionStorage, outboxStorage, err
}

func setupInputProvider(cfg config.InputProviderConfig, filestorageAdapter *adapter.FilestorageAdapter) *provider.InputProvider {
Expand Down
13 changes: 13 additions & 0 deletions Exesh/internal/domain/outbox/outbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package outbox

import (
"time"
)

type Outbox struct {
ID int64 `json:"id"`
Payload string `json:"payload"`
CreatedAt time.Time `json:"created_at"`
FailedAt *time.Time `json:"failed_at"`
FailedTries int `json:"failed_tries"`
}
32 changes: 19 additions & 13 deletions Exesh/internal/scheduler/execution_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type (
}

executionStorage interface {
GetForUpdate(context.Context, execution.ID) (*execution.Execution, error)
GetForSchedule(context.Context, time.Time) (*execution.Execution, error)
Save(context.Context, execution.Execution) error
GetExecutionForUpdate(context.Context, execution.ID) (*execution.Execution, error)
GetExecutionForSchedule(context.Context, time.Time) (*execution.Execution, error)
SaveExecution(context.Context, execution.Execution) error
}

jobFactory interface {
Expand All @@ -58,7 +58,7 @@ type (
}

messageSender interface {
Send(execution.Message)
Send(context.Context, execution.Message) error
}
)

Expand Down Expand Up @@ -137,7 +137,7 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) error {

s.changeNowExecutions(+1)
if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error {
e, err := s.executionStorage.GetForSchedule(ctx, time.Now().Add(-s.cfg.ExecutionRetryAfter))
e, err := s.executionStorage.GetExecutionForSchedule(ctx, time.Now().Add(-s.cfg.ExecutionRetryAfter))
if err != nil {
return fmt.Errorf("failed to get execution for schedule from storage: %w", err)
}
Expand All @@ -157,7 +157,7 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) error {
return fmt.Errorf("failed to schedule execution: %w", err)
}

if err = s.executionStorage.Save(ctx, *e); err != nil {
if err = s.executionStorage.SaveExecution(ctx, *e); err != nil {
return fmt.Errorf("failed to update execution in storage %s: %w", e.ID.String(), err)
}

Expand All @@ -173,7 +173,9 @@ func (s *ExecutionScheduler) scheduleExecution(ctx context.Context, execCtx *exe
s.log.Info("schedule execution", slog.String("execution_id", execCtx.ExecutionID.String()))

msg := s.messageFactory.CreateExecutionStarted(execCtx)
s.messageSender.Send(msg)
if err := s.messageSender.Send(ctx, msg); err != nil {
return fmt.Errorf("failed to send execution started message: %w", err)
}

for _, step := range execCtx.PickSteps() {
if err := s.scheduleStep(ctx, execCtx, step); err != nil {
Expand Down Expand Up @@ -249,7 +251,7 @@ func (s *ExecutionScheduler) doneStep(
)

if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error {
e, err := s.executionStorage.GetForUpdate(ctx, execCtx.ExecutionID)
e, err := s.executionStorage.GetExecutionForUpdate(ctx, execCtx.ExecutionID)
if err != nil {
return fmt.Errorf("failed to get execution for update from storage: %w", err)
}
Expand All @@ -262,13 +264,15 @@ func (s *ExecutionScheduler) doneStep(
return fmt.Errorf("failed to create message for step: %w", err)
}

s.messageSender.Send(msg)
if err = s.messageSender.Send(ctx, msg); err != nil {
return fmt.Errorf("failed to send message for step: %w", err)
}

execCtx.DoneStep(step.GetName())

e.SetScheduled(time.Now())

if err = s.executionStorage.Save(ctx, *e); err != nil {
if err = s.executionStorage.SaveExecution(ctx, *e); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -326,7 +330,7 @@ func (s *ExecutionScheduler) finishExecution(
execCtx.ForceDone()

if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error {
e, err := s.executionStorage.GetForUpdate(ctx, execCtx.ExecutionID)
e, err := s.executionStorage.GetExecutionForUpdate(ctx, execCtx.ExecutionID)
if err != nil {
return fmt.Errorf("failed to get execution for update from storage: %w", err)
}
Expand All @@ -341,11 +345,13 @@ func (s *ExecutionScheduler) finishExecution(
msg = s.messageFactory.CreateExecutionFinishedError(execCtx, execError.Error())
}

s.messageSender.Send(msg)
if err = s.messageSender.Send(ctx, msg); err != nil {
return fmt.Errorf("failed to send execution finished message: %w", err)
}

e.SetFinished(time.Now())

if err = s.executionStorage.Save(ctx, *e); err != nil {
if err = s.executionStorage.SaveExecution(ctx, *e); err != nil {
return err
}
return nil
Expand Down
140 changes: 94 additions & 46 deletions Exesh/internal/sender/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,56 @@ import (
"encoding/json"
"exesh/internal/config"
"exesh/internal/domain/execution"
"exesh/internal/lib/queue"
"exesh/internal/domain/outbox"
"fmt"
"log/slog"
"math"
"sync"
"strconv"
"time"

"github.com/segmentio/kafka-go"
)

type KafkaSender struct {
log *slog.Logger
type (
KafkaSender struct {
log *slog.Logger

messages queue.Queue[execution.Message]
mu sync.Mutex
unitOfWork unitOfWork
outboxStorage outboxStorage

writer *kafka.Writer
}
writer *kafka.Writer
}

outboxStorage interface {
CreateOutbox(ctx context.Context, ox outbox.Outbox) error
GetOutboxForSend(ctx context.Context) (ox *outbox.Outbox, err error)
SaveOutbox(ctx context.Context, ox outbox.Outbox) error
DeleteOutbox(ctx context.Context, ox outbox.Outbox) error
}

func NewKafkaSender(log *slog.Logger, cfg config.SenderConfig) *KafkaSender {
unitOfWork interface {
Do(context.Context, func(context.Context) error) error
}
)

func NewKafkaSender(
log *slog.Logger,
cfg config.SenderConfig,
unitOfWork unitOfWork,
outboxStorage outboxStorage,
) *KafkaSender {
writer := &kafka.Writer{
Addr: kafka.TCP(cfg.Brokers...),
Topic: cfg.Topic,
MaxAttempts: 1,
BatchSize: 1,
}

return &KafkaSender{
log: log,

messages: *queue.NewQueue[execution.Message](),
mu: sync.Mutex{},
unitOfWork: unitOfWork,
outboxStorage: outboxStorage,

writer: writer,
}
Expand All @@ -44,11 +64,23 @@ func (s *KafkaSender) Start(ctx context.Context) {
go s.run(ctx)
}

func (s *KafkaSender) Send(msg execution.Message) {
s.mu.Lock()
defer s.mu.Unlock()
func (s *KafkaSender) Send(ctx context.Context, msg execution.Message) error {
payload, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}

ox := outbox.Outbox{
Payload: string(payload),
CreatedAt: time.Now(),
FailedAt: nil,
FailedTries: 0,
}
if err = s.outboxStorage.CreateOutbox(ctx, ox); err != nil {
return fmt.Errorf("failed to create outbox: %w", err)
}

s.messages.Enqueue(msg)
return nil
}

func (s *KafkaSender) run(ctx context.Context) {
Expand All @@ -65,45 +97,61 @@ func (s *KafkaSender) run(ctx context.Context) {
break
}

ok := true
for {
s.mu.Lock()
msg := s.messages.Peek()
s.mu.Unlock()
if err := s.process(ctx); err != nil {
s.log.Error("failed to process outbox", slog.Any("error", err))
consequentFails++
continue
}

if msg == nil {
break
}
consequentFails = 0
}
}

value, err := json.Marshal(*msg)
if err != nil {
s.mu.Lock()
s.messages.Dequeue()
s.mu.Unlock()
continue
}
func (s *KafkaSender) process(ctx context.Context) error {
uowCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

kafkaMsg := kafka.Message{
Key: []byte((*msg).GetExecutionID().String()),
Value: value,
}
if err := s.unitOfWork.Do(uowCtx, func(ctx context.Context) error {
ox, err := s.outboxStorage.GetOutboxForSend(ctx)
if err != nil {
return fmt.Errorf("failed to get outbox for send: %w", err)
}

if ox == nil {
return nil
}

s.log.Debug("send to kafka", slog.Any("type", (*msg).GetType()))
if err = s.writer.WriteMessages(ctx, kafkaMsg); err != nil {
s.log.Error("failed to send message to kafka", slog.Any("error", err))
ok = false
break
if ox.FailedTries != 0 {
retryTimeout := time.Duration(100 * math.Pow(2, float64(min(ox.FailedTries, 6))))
if ox.FailedAt.Add(retryTimeout * time.Millisecond).Before(time.Now()) {
return nil
}
}

s.mu.Lock()
s.messages.Dequeue()
s.mu.Unlock()
message := kafka.Message{
Key: []byte(strconv.FormatInt(ox.ID, 10)),
Value: []byte(ox.Payload),
}

if ok {
consequentFails = 0
} else {
consequentFails++
s.log.Debug("send to kafka", slog.Int64("outbox_id", ox.ID))
err = s.writer.WriteMessages(ctx, message)
if err != nil {
failedAt := time.Now()
ox.FailedAt = &failedAt
ox.FailedTries++

_ = s.outboxStorage.SaveOutbox(ctx, *ox)
return fmt.Errorf("failed to send message to kafka: %w", err)
}

if err = s.outboxStorage.DeleteOutbox(ctx, *ox); err != nil {
return fmt.Errorf("failed to delete outbox: %w", err)
}

return nil
}); err != nil {
return err
}

return nil
}
Loading
Loading