From 36b41a4650be97d52d4bebe3bc4496f179d95694 Mon Sep 17 00:00:00 2001 From: divancode Date: Sun, 25 Jan 2026 18:51:56 +0300 Subject: [PATCH] feat(exesh): outbox in postgres to send messages --- Exesh/cmd/coordinator/main.go | 12 +- Exesh/internal/domain/outbox/outbox.go | 13 ++ .../internal/scheduler/execution_scheduler.go | 32 ++-- Exesh/internal/sender/message_sender.go | 140 ++++++++++++------ .../storage/postgres/execution_storage.go | 38 ++--- .../storage/postgres/outbox_storage.go | 108 ++++++++++++++ Exesh/internal/usecase/execute/usecase.go | 4 +- 7 files changed, 263 insertions(+), 84 deletions(-) create mode 100644 Exesh/internal/domain/outbox/outbox.go create mode 100644 Exesh/internal/storage/postgres/outbox_storage.go diff --git a/Exesh/cmd/coordinator/main.go b/Exesh/cmd/coordinator/main.go index f5847321..f3a47e1d 100644 --- a/Exesh/cmd/coordinator/main.go +++ b/Exesh/cmd/coordinator/main.go @@ -51,7 +51,7 @@ func main() { mux := chi.NewRouter() - unitOfWork, executionStorage, err := setupStorage(log, cfg.Storage) + unitOfWork, executionStorage, outboxStorage, err := setupStorage(log, cfg.Storage) if err != nil { log.Error("failed to setup storage", slog.String("error", err.Error())) return @@ -75,7 +75,7 @@ func main() { jobFactory := factory.NewJobFactory(log, cfg.JobFactory, artifactRegistry, inputProvider) messageFactory := factory.NewMessageFactory(log) - messageSender := sender.NewKafkaSender(log, cfg.Sender) + messageSender := sender.NewKafkaSender(log, cfg.Sender, unitOfWork, outboxStorage) messageSender.Start(ctx) promRegistry := prometheus.NewRegistry() @@ -157,6 +157,7 @@ func setupLogger(env string) (log *slog.Logger, err error) { func setupStorage(log *slog.Logger, cfg config.StorageConfig) ( unitOfWork *postgres.UnitOfWork, executionStorage *postgres.ExecutionStorage, + outboxStorage *postgres.OutboxStorage, err error, ) { ctx, cancel := context.WithTimeout(context.Background(), cfg.InitTimeout) @@ -165,17 +166,20 @@ func setupStorage(log *slog.Logger, cfg config.StorageConfig) ( unitOfWork, err = postgres.NewUnitOfWork(cfg) if err != nil { err = fmt.Errorf("failed to create unit of work: %w", err) - return unitOfWork, executionStorage, err + return unitOfWork, executionStorage, outboxStorage, err } err = unitOfWork.Do(ctx, func(ctx context.Context) error { if executionStorage, err = postgres.NewExecutionStorage(ctx, log); err != nil { return fmt.Errorf("failed to create execution storage: %w", err) } + if outboxStorage, err = postgres.NewOutboxStorage(ctx, log); err != nil { + return fmt.Errorf("failed to create outbox storage: %w", err) + } return nil }) - return unitOfWork, executionStorage, err + return unitOfWork, executionStorage, outboxStorage, err } func setupInputProvider(cfg config.InputProviderConfig, filestorageAdapter *adapter.FilestorageAdapter) *provider.InputProvider { diff --git a/Exesh/internal/domain/outbox/outbox.go b/Exesh/internal/domain/outbox/outbox.go new file mode 100644 index 00000000..3f9f4a48 --- /dev/null +++ b/Exesh/internal/domain/outbox/outbox.go @@ -0,0 +1,13 @@ +package outbox + +import ( + "time" +) + +type Outbox struct { + ID int64 `json:"id"` + Payload string `json:"payload"` + CreatedAt time.Time `json:"created_at"` + FailedAt *time.Time `json:"failed_at"` + FailedTries int `json:"failed_tries"` +} diff --git a/Exesh/internal/scheduler/execution_scheduler.go b/Exesh/internal/scheduler/execution_scheduler.go index caa70d53..80cc9048 100644 --- a/Exesh/internal/scheduler/execution_scheduler.go +++ b/Exesh/internal/scheduler/execution_scheduler.go @@ -37,9 +37,9 @@ type ( } executionStorage interface { - GetForUpdate(context.Context, execution.ID) (*execution.Execution, error) - GetForSchedule(context.Context, time.Time) (*execution.Execution, error) - Save(context.Context, execution.Execution) error + GetExecutionForUpdate(context.Context, execution.ID) (*execution.Execution, error) + GetExecutionForSchedule(context.Context, time.Time) (*execution.Execution, error) + SaveExecution(context.Context, execution.Execution) error } jobFactory interface { @@ -58,7 +58,7 @@ type ( } messageSender interface { - Send(execution.Message) + Send(context.Context, execution.Message) error } ) @@ -137,7 +137,7 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) error { s.changeNowExecutions(+1) if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error { - e, err := s.executionStorage.GetForSchedule(ctx, time.Now().Add(-s.cfg.ExecutionRetryAfter)) + e, err := s.executionStorage.GetExecutionForSchedule(ctx, time.Now().Add(-s.cfg.ExecutionRetryAfter)) if err != nil { return fmt.Errorf("failed to get execution for schedule from storage: %w", err) } @@ -157,7 +157,7 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) error { return fmt.Errorf("failed to schedule execution: %w", err) } - if err = s.executionStorage.Save(ctx, *e); err != nil { + if err = s.executionStorage.SaveExecution(ctx, *e); err != nil { return fmt.Errorf("failed to update execution in storage %s: %w", e.ID.String(), err) } @@ -173,7 +173,9 @@ func (s *ExecutionScheduler) scheduleExecution(ctx context.Context, execCtx *exe s.log.Info("schedule execution", slog.String("execution_id", execCtx.ExecutionID.String())) msg := s.messageFactory.CreateExecutionStarted(execCtx) - s.messageSender.Send(msg) + if err := s.messageSender.Send(ctx, msg); err != nil { + return fmt.Errorf("failed to send execution started message: %w", err) + } for _, step := range execCtx.PickSteps() { if err := s.scheduleStep(ctx, execCtx, step); err != nil { @@ -249,7 +251,7 @@ func (s *ExecutionScheduler) doneStep( ) if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error { - e, err := s.executionStorage.GetForUpdate(ctx, execCtx.ExecutionID) + e, err := s.executionStorage.GetExecutionForUpdate(ctx, execCtx.ExecutionID) if err != nil { return fmt.Errorf("failed to get execution for update from storage: %w", err) } @@ -262,13 +264,15 @@ func (s *ExecutionScheduler) doneStep( return fmt.Errorf("failed to create message for step: %w", err) } - s.messageSender.Send(msg) + if err = s.messageSender.Send(ctx, msg); err != nil { + return fmt.Errorf("failed to send message for step: %w", err) + } execCtx.DoneStep(step.GetName()) e.SetScheduled(time.Now()) - if err = s.executionStorage.Save(ctx, *e); err != nil { + if err = s.executionStorage.SaveExecution(ctx, *e); err != nil { return err } return nil @@ -326,7 +330,7 @@ func (s *ExecutionScheduler) finishExecution( execCtx.ForceDone() if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error { - e, err := s.executionStorage.GetForUpdate(ctx, execCtx.ExecutionID) + e, err := s.executionStorage.GetExecutionForUpdate(ctx, execCtx.ExecutionID) if err != nil { return fmt.Errorf("failed to get execution for update from storage: %w", err) } @@ -341,11 +345,13 @@ func (s *ExecutionScheduler) finishExecution( msg = s.messageFactory.CreateExecutionFinishedError(execCtx, execError.Error()) } - s.messageSender.Send(msg) + if err = s.messageSender.Send(ctx, msg); err != nil { + return fmt.Errorf("failed to send execution finished message: %w", err) + } e.SetFinished(time.Now()) - if err = s.executionStorage.Save(ctx, *e); err != nil { + if err = s.executionStorage.SaveExecution(ctx, *e); err != nil { return err } return nil diff --git a/Exesh/internal/sender/message_sender.go b/Exesh/internal/sender/message_sender.go index 8a8d6a4c..5d62043c 100644 --- a/Exesh/internal/sender/message_sender.go +++ b/Exesh/internal/sender/message_sender.go @@ -5,36 +5,56 @@ import ( "encoding/json" "exesh/internal/config" "exesh/internal/domain/execution" - "exesh/internal/lib/queue" + "exesh/internal/domain/outbox" + "fmt" "log/slog" "math" - "sync" + "strconv" "time" "github.com/segmentio/kafka-go" ) -type KafkaSender struct { - log *slog.Logger +type ( + KafkaSender struct { + log *slog.Logger - messages queue.Queue[execution.Message] - mu sync.Mutex + unitOfWork unitOfWork + outboxStorage outboxStorage - writer *kafka.Writer -} + writer *kafka.Writer + } + + outboxStorage interface { + CreateOutbox(ctx context.Context, ox outbox.Outbox) error + GetOutboxForSend(ctx context.Context) (ox *outbox.Outbox, err error) + SaveOutbox(ctx context.Context, ox outbox.Outbox) error + DeleteOutbox(ctx context.Context, ox outbox.Outbox) error + } -func NewKafkaSender(log *slog.Logger, cfg config.SenderConfig) *KafkaSender { + unitOfWork interface { + Do(context.Context, func(context.Context) error) error + } +) + +func NewKafkaSender( + log *slog.Logger, + cfg config.SenderConfig, + unitOfWork unitOfWork, + outboxStorage outboxStorage, +) *KafkaSender { writer := &kafka.Writer{ Addr: kafka.TCP(cfg.Brokers...), Topic: cfg.Topic, MaxAttempts: 1, BatchSize: 1, } + return &KafkaSender{ log: log, - messages: *queue.NewQueue[execution.Message](), - mu: sync.Mutex{}, + unitOfWork: unitOfWork, + outboxStorage: outboxStorage, writer: writer, } @@ -44,11 +64,23 @@ func (s *KafkaSender) Start(ctx context.Context) { go s.run(ctx) } -func (s *KafkaSender) Send(msg execution.Message) { - s.mu.Lock() - defer s.mu.Unlock() +func (s *KafkaSender) Send(ctx context.Context, msg execution.Message) error { + payload, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + ox := outbox.Outbox{ + Payload: string(payload), + CreatedAt: time.Now(), + FailedAt: nil, + FailedTries: 0, + } + if err = s.outboxStorage.CreateOutbox(ctx, ox); err != nil { + return fmt.Errorf("failed to create outbox: %w", err) + } - s.messages.Enqueue(msg) + return nil } func (s *KafkaSender) run(ctx context.Context) { @@ -65,45 +97,61 @@ func (s *KafkaSender) run(ctx context.Context) { break } - ok := true - for { - s.mu.Lock() - msg := s.messages.Peek() - s.mu.Unlock() + if err := s.process(ctx); err != nil { + s.log.Error("failed to process outbox", slog.Any("error", err)) + consequentFails++ + continue + } - if msg == nil { - break - } + consequentFails = 0 + } +} - value, err := json.Marshal(*msg) - if err != nil { - s.mu.Lock() - s.messages.Dequeue() - s.mu.Unlock() - continue - } +func (s *KafkaSender) process(ctx context.Context) error { + uowCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() - kafkaMsg := kafka.Message{ - Key: []byte((*msg).GetExecutionID().String()), - Value: value, - } + if err := s.unitOfWork.Do(uowCtx, func(ctx context.Context) error { + ox, err := s.outboxStorage.GetOutboxForSend(ctx) + if err != nil { + return fmt.Errorf("failed to get outbox for send: %w", err) + } + + if ox == nil { + return nil + } - s.log.Debug("send to kafka", slog.Any("type", (*msg).GetType())) - if err = s.writer.WriteMessages(ctx, kafkaMsg); err != nil { - s.log.Error("failed to send message to kafka", slog.Any("error", err)) - ok = false - break + if ox.FailedTries != 0 { + retryTimeout := time.Duration(100 * math.Pow(2, float64(min(ox.FailedTries, 6)))) + if ox.FailedAt.Add(retryTimeout * time.Millisecond).Before(time.Now()) { + return nil } + } - s.mu.Lock() - s.messages.Dequeue() - s.mu.Unlock() + message := kafka.Message{ + Key: []byte(strconv.FormatInt(ox.ID, 10)), + Value: []byte(ox.Payload), } - if ok { - consequentFails = 0 - } else { - consequentFails++ + s.log.Debug("send to kafka", slog.Int64("outbox_id", ox.ID)) + err = s.writer.WriteMessages(ctx, message) + if err != nil { + failedAt := time.Now() + ox.FailedAt = &failedAt + ox.FailedTries++ + + _ = s.outboxStorage.SaveOutbox(ctx, *ox) + return fmt.Errorf("failed to send message to kafka: %w", err) } + + if err = s.outboxStorage.DeleteOutbox(ctx, *ox); err != nil { + return fmt.Errorf("failed to delete outbox: %w", err) + } + + return nil + }); err != nil { + return err } + + return nil } diff --git a/Exesh/internal/storage/postgres/execution_storage.go b/Exesh/internal/storage/postgres/execution_storage.go index df6cc2ff..8afcdef8 100644 --- a/Exesh/internal/storage/postgres/execution_storage.go +++ b/Exesh/internal/storage/postgres/execution_storage.go @@ -17,7 +17,7 @@ type ExecutionStorage struct { } const ( - createTableQuery = ` + createExecutionTableQuery = ` CREATE TABLE IF NOT EXISTS Executions( id varchar(36) PRIMARY KEY, steps jsonb, @@ -28,18 +28,18 @@ const ( ); ` - insertQuery = ` + insertExecutionQuery = ` INSERT INTO Executions(id, steps, status, created_at, scheduled_at, finished_at) VALUES ($1, $2, $3, $4, $5, $6); ` - selectForUpdateQuery = ` + selectExecutionForUpdateQuery = ` SELECT id, steps, status, created_at, scheduled_at, finished_at FROM Executions WHERE id = $1 FOR UPDATE ` - selectForScheduleQuery = ` + selectExecutionForScheduleQuery = ` SELECT id, steps, status, created_at, scheduled_at, finished_at FROM Executions WHERE status = $1 OR (status = $2 AND scheduled_at < $3) ORDER BY created_at @@ -47,7 +47,7 @@ const ( FOR UPDATE; ` - updateQuery = ` + updateExecutionQuery = ` UPDATE Executions SET steps=$2, status=$3, created_at=$4, scheduled_at=$5, finished_at=$6 WHERE id=$1; ` @@ -56,38 +56,38 @@ const ( func NewExecutionStorage(ctx context.Context, log *slog.Logger) (*ExecutionStorage, error) { tx := extractTx(ctx) - if _, err := tx.ExecContext(ctx, createTableQuery); err != nil { - return nil, fmt.Errorf("failed to create table: %w", err) + if _, err := tx.ExecContext(ctx, createExecutionTableQuery); err != nil { + return nil, fmt.Errorf("failed to create execution table: %w", err) } return &ExecutionStorage{log: log}, nil } -func (s *ExecutionStorage) Create(ctx context.Context, e execution.Execution) error { +func (s *ExecutionStorage) CreateExecution(ctx context.Context, e execution.Execution) error { tx := extractTx(ctx) - if _, err := tx.ExecContext(ctx, insertQuery, + if _, err := tx.ExecContext(ctx, insertExecutionQuery, e.ID, e.Steps, e.Status, e.CreatedAt, e.ScheduledAt, e.FinishedAt); err != nil { - return fmt.Errorf("failed to do insert query: %w", err) + return fmt.Errorf("failed to do insert execution query: %w", err) } return nil } -func (s *ExecutionStorage) GetForUpdate(ctx context.Context, id execution.ID) (e *execution.Execution, err error) { +func (s *ExecutionStorage) GetExecutionForUpdate(ctx context.Context, id execution.ID) (e *execution.Execution, err error) { tx := extractTx(ctx) e = &execution.Execution{} var eid string var stepsRaw json.RawMessage - if err = tx.QueryRowContext(ctx, selectForUpdateQuery, id). + if err = tx.QueryRowContext(ctx, selectExecutionForUpdateQuery, id). Scan(&eid, &stepsRaw, &e.Status, &e.CreatedAt, &e.ScheduledAt, &e.FinishedAt); err != nil { if errors.Is(err, sql.ErrNoRows) { e = nil err = nil return } - err = fmt.Errorf("failed to do select query: %w", err) + err = fmt.Errorf("failed to do select execution for update query: %w", err) return } @@ -104,13 +104,13 @@ func (s *ExecutionStorage) GetForUpdate(ctx context.Context, id execution.ID) (e return } -func (s *ExecutionStorage) GetForSchedule(ctx context.Context, retryBefore time.Time) (e *execution.Execution, err error) { +func (s *ExecutionStorage) GetExecutionForSchedule(ctx context.Context, retryBefore time.Time) (e *execution.Execution, err error) { tx := extractTx(ctx) e = &execution.Execution{} var eid string var stepsRaw json.RawMessage - if err = tx.QueryRowContext(ctx, selectForScheduleQuery, + if err = tx.QueryRowContext(ctx, selectExecutionForScheduleQuery, execution.StatusNewExecution, execution.StatusScheduledExecution, retryBefore). Scan(&eid, &stepsRaw, &e.Status, &e.CreatedAt, &e.ScheduledAt, &e.FinishedAt); err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -118,7 +118,7 @@ func (s *ExecutionStorage) GetForSchedule(ctx context.Context, retryBefore time. err = nil return } - err = fmt.Errorf("failed to do select query: %w", err) + err = fmt.Errorf("failed to do select execution for schedule query: %w", err) return } @@ -135,12 +135,12 @@ func (s *ExecutionStorage) GetForSchedule(ctx context.Context, retryBefore time. return } -func (s *ExecutionStorage) Save(ctx context.Context, e execution.Execution) error { +func (s *ExecutionStorage) SaveExecution(ctx context.Context, e execution.Execution) error { tx := extractTx(ctx) - if _, err := tx.ExecContext(ctx, updateQuery, + if _, err := tx.ExecContext(ctx, updateExecutionQuery, e.ID, e.Steps, e.Status, e.CreatedAt, e.ScheduledAt, e.FinishedAt); err != nil { - return fmt.Errorf("failed to do update query: %w", err) + return fmt.Errorf("failed to do update execution query: %w", err) } return nil diff --git a/Exesh/internal/storage/postgres/outbox_storage.go b/Exesh/internal/storage/postgres/outbox_storage.go new file mode 100644 index 00000000..3c0222a4 --- /dev/null +++ b/Exesh/internal/storage/postgres/outbox_storage.go @@ -0,0 +1,108 @@ +package postgres + +import ( + "context" + "database/sql" + "errors" + "exesh/internal/domain/outbox" + "fmt" + "log/slog" +) + +type OutboxStorage struct { + log *slog.Logger +} + +const ( + createOutboxTableQuery = ` + CREATE TABLE IF NOT EXISTS Outbox( + id BIGSERIAL PRIMARY KEY, + message text, + created_at timestamp, + failed_at timestamp NULL, + failed_tries integer + ); + ` + + insertOutboxQuery = ` + INSERT INTO Outbox(message, created_at, failed_at, failed_tries) + VALUES ($1, $2, $3, $4); + ` + + selectOutboxForSendQuery = ` + SELECT id, message, created_at, failed_at, failed_tries FROM Outbox + ORDER BY created_at + LIMIT 1 + FOR UPDATE + ` + + updateOutboxQuery = ` + UPDATE Outbox SET message=$2, created_at=$3, failed_at=$4, failed_tries=$5 + WHERE id=$1; + ` + + deleteOutboxQuery = ` + DELETE FROM Outbox + WHERE id=$1; + ` +) + +func NewOutboxStorage(ctx context.Context, log *slog.Logger) (*OutboxStorage, error) { + tx := extractTx(ctx) + + if _, err := tx.ExecContext(ctx, createOutboxTableQuery); err != nil { + return nil, fmt.Errorf("failed to create outbox table: %w", err) + } + + return &OutboxStorage{log: log}, nil +} + +func (s *OutboxStorage) CreateOutbox(ctx context.Context, ox outbox.Outbox) error { + tx := extractTx(ctx) + + if _, err := tx.ExecContext(ctx, insertOutboxQuery, + ox.Payload, ox.CreatedAt, ox.FailedAt, ox.FailedTries); err != nil { + return fmt.Errorf("failed to do insert outbox query: %w", err) + } + + return nil +} + +func (s *OutboxStorage) GetOutboxForSend(ctx context.Context) (ox *outbox.Outbox, err error) { + tx := extractTx(ctx) + + ox = &outbox.Outbox{} + if err = tx.QueryRowContext(ctx, selectOutboxForSendQuery). + Scan(&ox.ID, &ox.Payload, &ox.CreatedAt, &ox.FailedAt, &ox.FailedTries); err != nil { + if errors.Is(err, sql.ErrNoRows) { + ox = nil + err = nil + return + } + err = fmt.Errorf("failed to do select outbox for send query: %w", err) + return + } + + return +} + +func (s *OutboxStorage) SaveOutbox(ctx context.Context, ox outbox.Outbox) error { + tx := extractTx(ctx) + + if _, err := tx.ExecContext(ctx, updateOutboxQuery, + ox.ID, ox.Payload, ox.CreatedAt, ox.FailedAt, ox.FailedTries); err != nil { + return fmt.Errorf("failed to do update outbox query: %w", err) + } + + return nil +} + +func (s *OutboxStorage) DeleteOutbox(ctx context.Context, ox outbox.Outbox) error { + tx := extractTx(ctx) + + if _, err := tx.ExecContext(ctx, deleteOutboxQuery, ox.ID); err != nil { + return fmt.Errorf("failed to do delete outbox query: %w", err) + } + + return nil +} diff --git a/Exesh/internal/usecase/execute/usecase.go b/Exesh/internal/usecase/execute/usecase.go index 2fb852db..243c82cc 100644 --- a/Exesh/internal/usecase/execute/usecase.go +++ b/Exesh/internal/usecase/execute/usecase.go @@ -19,7 +19,7 @@ type ( } executionStorage interface { - Create(context.Context, execution.Execution) error + CreateExecution(context.Context, execution.Execution) error } ) @@ -47,7 +47,7 @@ func (uc *UseCase) Execute(ctx context.Context, command Command) (result Result, err = uc.unitOfWork.Do(ctx, func(ctx context.Context) error { e := execution.NewExecution(command.Steps) - if err = uc.executionStorage.Create(ctx, e); err != nil { + if err = uc.executionStorage.CreateExecution(ctx, e); err != nil { return fmt.Errorf("failed to create execution in storage: %w", err) }