From 686890007547f93023a0a4f6cf48d41f8b83193a Mon Sep 17 00:00:00 2001 From: Denis Mishankov Date: Thu, 19 Mar 2026 09:02:13 +0300 Subject: [PATCH 1/8] Fix #71: use wide events in queue and scheduler --- auth/handler_delete_test.go | 10 +- auth/middleware_test.go | 10 +- httpserver/httpserver_test.go | 12 +-- httpserver/recover_test.go | 9 +- log/log.go | 48 +++++++++- log/traceid_test.go | 3 +- queue/processor.go | 84 ++++++++++++++--- queue/processor_internal_test.go | 104 +++++++++++++++++++++ scheduler/scheduler.go | 36 ++++++-- scheduler/scheduler_internal_test.go | 131 +++++++++++++++++++++++++++ 10 files changed, 401 insertions(+), 46 deletions(-) create mode 100644 queue/processor_internal_test.go create mode 100644 scheduler/scheduler_internal_test.go diff --git a/auth/handler_delete_test.go b/auth/handler_delete_test.go index 70ff484..c98c21c 100644 --- a/auth/handler_delete_test.go +++ b/auth/handler_delete_test.go @@ -18,7 +18,7 @@ func TestDeleteHandler_Success(t *testing.T) { } handler := auth.NewDeleteHandler(mockService) - req := httptest.NewRequest(http.MethodDelete, "/", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodDelete, "/", nil) ctx := context.WithValue(req.Context(), auth.UserContextKey, &auth.User{ID: "user-id"}) req = req.WithContext(ctx) w := httptest.NewRecorder() @@ -42,7 +42,7 @@ func TestDeleteHandler_WrongMethod(t *testing.T) { mockService := &mockDeleteService{} handler := auth.NewDeleteHandler(mockService) - req := httptest.NewRequest(http.MethodGet, "/", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) w := httptest.NewRecorder() handler.ServeHTTP(w, req) @@ -60,7 +60,7 @@ func TestDeleteHandler_UserNotFound(t *testing.T) { } handler := auth.NewDeleteHandler(mockService) - req := httptest.NewRequest(http.MethodDelete, "/", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodDelete, "/", nil) w := httptest.NewRecorder() handler.ServeHTTP(w, req) @@ -78,7 +78,7 @@ func TestDeleteHandler_InternalError(t *testing.T) { } handler := auth.NewDeleteHandler(mockService) - req := httptest.NewRequest(http.MethodDelete, "/", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodDelete, "/", nil) ctx := context.WithValue(req.Context(), auth.UserContextKey, &auth.User{ID: "user-id"}) req = req.WithContext(ctx) w := httptest.NewRecorder() @@ -98,7 +98,7 @@ func TestDeleteHandler_NoUserInContext(t *testing.T) { } handler := auth.NewDeleteHandler(mockService) - req := httptest.NewRequest(http.MethodDelete, "/", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodDelete, "/", nil) w := httptest.NewRecorder() handler.ServeHTTP(w, req) diff --git a/auth/middleware_test.go b/auth/middleware_test.go index f3073cd..7e3370f 100644 --- a/auth/middleware_test.go +++ b/auth/middleware_test.go @@ -25,7 +25,7 @@ func TestAuthenticationMiddleware_ValidSession(t *testing.T) { w.WriteHeader(http.StatusOK) })) - req := httptest.NewRequest(http.MethodGet, "/", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) req.AddCookie(&http.Cookie{Name: "session", Value: "valid-session-id"}) w := httptest.NewRecorder() @@ -48,7 +48,7 @@ func TestAuthenticationMiddleware_NoSessionCookie(t *testing.T) { t.Fatal("handler should not be called when authentication fails") })) - req := httptest.NewRequest(http.MethodGet, "/", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) w := httptest.NewRecorder() handler.ServeHTTP(w, req) @@ -71,7 +71,7 @@ func TestAuthenticationMiddleware_InvalidSession(t *testing.T) { t.Fatal("handler should not be called when authentication fails") })) - req := httptest.NewRequest(http.MethodGet, "/", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) req.AddCookie(&http.Cookie{Name: "session", Value: "invalid-session-id"}) w := httptest.NewRecorder() @@ -95,7 +95,7 @@ func TestAuthenticationMiddleware_UserServiceError(t *testing.T) { t.Fatal("handler should not be called when authentication fails") })) - req := httptest.NewRequest(http.MethodGet, "/", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) req.AddCookie(&http.Cookie{Name: "session", Value: "session-id"}) w := httptest.NewRecorder() @@ -119,7 +119,7 @@ func TestAuthenticationMiddleware_UserNotFound(t *testing.T) { t.Fatal("handler should not be called when authentication fails") })) - req := httptest.NewRequest(http.MethodGet, "/", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) req.AddCookie(&http.Cookie{Name: "session", Value: "session-id"}) w := httptest.NewRecorder() diff --git a/httpserver/httpserver_test.go b/httpserver/httpserver_test.go index 64bb356..7f54445 100644 --- a/httpserver/httpserver_test.go +++ b/httpserver/httpserver_test.go @@ -22,7 +22,7 @@ func TestHTTPServer(t *testing.T) { w.Write([]byte("pong")) }) - r := httptest.NewRequest(http.MethodGet, "/ping", nil) + r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/ping", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) @@ -48,7 +48,7 @@ func TestHTTPServer(t *testing.T) { server.Handle("/ping", pingHandler) - r := httptest.NewRequest(http.MethodGet, "/ping", nil) + r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/ping", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) @@ -70,7 +70,7 @@ func TestHTTPServer(t *testing.T) { server := httpserver.New("", 0) server.HandleGroup("/hg", hg) - r := httptest.NewRequest(http.MethodGet, "/hg/test", nil) + r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/hg/test", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) @@ -113,7 +113,7 @@ func TestHTTPServer(t *testing.T) { server.Use(customMiddleware) server.Handle("/test", &handler{}) - r := httptest.NewRequest(http.MethodGet, "/test", nil) + r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) @@ -143,7 +143,7 @@ func TestHTTPServer(t *testing.T) { server.UseFunc(customMiddlewareFunc) server.Handle("/test", &handler{}) - r := httptest.NewRequest(http.MethodGet, "/test", nil) + r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) @@ -186,7 +186,7 @@ func TestHTTPServer(t *testing.T) { server.Use(firstMiddleware) server.UseFunc(secondMiddlewareFunc) - r := httptest.NewRequest(http.MethodGet, "/test", nil) + r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) diff --git a/httpserver/recover_test.go b/httpserver/recover_test.go index 283bec7..ddd6c62 100644 --- a/httpserver/recover_test.go +++ b/httpserver/recover_test.go @@ -1,6 +1,7 @@ package httpserver_test import ( + "context" "net/http" "net/http/httptest" "testing" @@ -34,7 +35,7 @@ func TestRecoverMiddleware_NormalOperation(t *testing.T) { wrappedHandler := middleware.Wrap(handler) // Create test request - req := httptest.NewRequest(http.MethodGet, "/test", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) w := httptest.NewRecorder() // Execute @@ -61,7 +62,7 @@ func TestRecoverMiddleware_PanicRecovery(t *testing.T) { wrappedHandler := middleware.Wrap(handler) // Create test request - req := httptest.NewRequest(http.MethodGet, "/test", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) w := httptest.NewRecorder() // Execute - this should not panic due to the recovery middleware @@ -89,7 +90,7 @@ func TestRecoverMiddleware_ErrorResponse(t *testing.T) { wrappedHandler := middleware.Wrap(handler) // Create test request - req := httptest.NewRequest(http.MethodGet, "/test", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) w := httptest.NewRecorder() // Execute @@ -125,7 +126,7 @@ func TestRecoverMiddleware_MultiplePanics(t *testing.T) { // Test multiple requests to ensure middleware continues to work for i := range 3 { - req := httptest.NewRequest(http.MethodGet, "/test", nil) + req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) w := httptest.NewRecorder() wrappedHandler.ServeHTTP(w, req) diff --git a/log/log.go b/log/log.go index 4b8ccb5..618cc9e 100644 --- a/log/log.go +++ b/log/log.go @@ -9,7 +9,8 @@ import ( "os" ) -type logger interface { +// DefaultLogger is the interface required by package-level logging helpers. +type DefaultLogger interface { Debug(msg string, args ...any) Info(msg string, args ...any) Warn(msg string, args ...any) @@ -21,14 +22,55 @@ type logger interface { ErrorContext(ctx context.Context, msg string, args ...any) } +type wideEventWriter interface { + WriteEvent(ctx context.Context, e *Event) +} + // Logger is the default logger instance used by package-level logging functions. -var Logger logger = New(os.Stdout, "text", slog.LevelInfo, nil) //nolint:gochecknoglobals +var Logger DefaultLogger = New(os.Stdout, "text", slog.LevelInfo, nil) //nolint:gochecknoglobals // SetDefault sets the default logger used by the package-level logging functions. -func SetDefault(l logger) { +func SetDefault(l DefaultLogger) { Logger = l } +// WriteEvent writes a finalized wide event when the default logger supports it. +func WriteEvent(ctx context.Context, e *Event) { + if e == nil { + return + } + + writer, ok := Logger.(wideEventWriter) + if ok { + writer.WriteEvent(ctx, e) + return + } + + e.Finish() + + args := eventArgs(e.ToAttrs()) + + switch level := e.Level(); { + case level >= slog.LevelError: + Logger.ErrorContext(ctx, e.Name(), args...) + case level >= slog.LevelWarn: + Logger.WarnContext(ctx, e.Name(), args...) + case level >= slog.LevelInfo: + Logger.InfoContext(ctx, e.Name(), args...) + default: + Logger.DebugContext(ctx, e.Name(), args...) + } +} + +func eventArgs(attrs []slog.Attr) []any { + args := make([]any, 0, len(attrs)) + for _, attr := range attrs { + args = append(args, attr) + } + + return args +} + type contextKey string const ( diff --git a/log/traceid_test.go b/log/traceid_test.go index 29263ff..5fcae15 100644 --- a/log/traceid_test.go +++ b/log/traceid_test.go @@ -1,6 +1,7 @@ package log_test import ( + "context" "net/http" "net/http/httptest" "testing" @@ -22,7 +23,7 @@ func TestTraceIDMiddleware(t *testing.T) { } })) - r := httptest.NewRequest(http.MethodGet, "/", nil) + r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) w := httptest.NewRecorder() wrappedHandler.ServeHTTP(w, r) diff --git a/queue/processor.go b/queue/processor.go index fa25012..b55e738 100644 --- a/queue/processor.go +++ b/queue/processor.go @@ -2,7 +2,9 @@ package queue import ( "context" + "errors" "fmt" + "log/slog" "sync" "time" @@ -10,6 +12,13 @@ import ( "github.com/platforma-dev/platforma/log" ) +const ( + processorRunEventName = "queue.processor.run" + workerRunEventName = "queue.worker.run" +) + +var errWorkerPanicRecovered = errors.New("worker panic recovered") + // Handler defines the interface for processing jobs. type Handler[T any] interface { Handle(ctx context.Context, job T) @@ -56,11 +65,19 @@ func (p *Processor[T]) Enqueue(ctx context.Context, job T) error { // Run starts the queue processor and blocks until all workers complete. func (p *Processor[T]) Run(ctx context.Context) error { + runEvent := newProcessorRunEvent(p.workersAmount, p.shutdownTimeout) + defer log.WriteEvent(ctx, runEvent) + + runEvent.AddStep(slog.LevelInfo, "opening queue") + err := p.queue.Open(ctx) if err != nil { + runEvent.AddError(fmt.Errorf("failed to open queue: %w", err)) return fmt.Errorf("failed to open queue: %w", err) } + runEvent.AddStep(slog.LevelInfo, "starting workers") + p.wg.Add(p.workersAmount) for range p.workersAmount { workerCtx := context.WithValue(ctx, log.WorkerIDKey, uuid.NewString()) @@ -70,33 +87,72 @@ func (p *Processor[T]) Run(ctx context.Context) error { p.wg.Wait() - log.InfoContext(ctx, "all workers shut down") + runEvent.AddStep(slog.LevelInfo, "all workers shut down") err = p.queue.Close(ctx) if err != nil { + runEvent.AddError(fmt.Errorf("failed to close queue: %w", err)) return fmt.Errorf("failed to close queue: %w", err) } + runEvent.AddStep(slog.LevelInfo, "queue closed") + return nil } func (p *Processor[T]) worker(ctx context.Context) { defer p.wg.Done() - defer log.InfoContext(ctx, "worker finished") + log.WriteEvent(ctx, p.runWorker(ctx)) +} + +func newProcessorRunEvent(workersAmount int, shutdownTimeout time.Duration) *log.Event { + event := log.NewEvent(processorRunEventName) + event.AddAttrs(map[string]any{ + "queue.workersAmount": workersAmount, + "queue.shutdownTimeout": shutdownTimeout, + }) + + return event +} + +func newWorkerRunEvent(shutdownTimeout time.Duration) *log.Event { + event := log.NewEvent(workerRunEventName) + event.AddAttrs(map[string]any{ + "queue.shutdownTimeout": shutdownTimeout, + }) + + return event +} + +func (p *Processor[T]) runWorker(ctx context.Context) (event *log.Event) { + event = newWorkerRunEvent(p.shutdownTimeout) + processedJobs := 0 + drainedJobs := 0 + + defer func() { + event.AddAttrs(map[string]any{ + "queue.processedJobs": processedJobs, + "queue.drainedJobs": drainedJobs, + }) + event.AddStep(slog.LevelInfo, "worker finished") + }() defer func() { if r := recover(); r != nil { - log.ErrorContext(ctx, "worker panic recovered", "panic", r) + event.AddStep(slog.LevelError, "worker panic recovered") + event.AddError(fmt.Errorf("%w: %v", errWorkerPanicRecovered, r)) } }() - log.InfoContext(ctx, "worker started") + event.AddStep(slog.LevelInfo, "worker started") jobChan, err := p.queue.GetJobChan(ctx) if err != nil { - log.ErrorContext(ctx, "failed to get job chan", "error", err) - return + event.AddError(fmt.Errorf("failed to get job chan: %w", err)) + return event } + event.AddStep(slog.LevelInfo, "job channel opened") + // we first check for ctx.Done() in separate select statement // because select statements choose randomly if both cases are ready for { @@ -104,15 +160,16 @@ func (p *Processor[T]) worker(ctx context.Context) { select { case <-ctx.Done(): - log.InfoContext(ctx, "skipping job due to shutdown") + event.AddStep(slog.LevelInfo, "shutdown requested") breakLoop = true default: select { case job := <-jobChan: p.handler.Handle(ctx, job) + processedJobs++ case <-ctx.Done(): - log.InfoContext(ctx, "shutting down worker") + event.AddStep(slog.LevelInfo, "shutdown requested") breakLoop = true } } @@ -124,6 +181,8 @@ func (p *Processor[T]) worker(ctx context.Context) { // after context is cancelled we try to drain remaining jobs from channel // before shutdown time expired + event.AddStep(slog.LevelInfo, "draining remaining jobs") + shutdownCtx := context.WithoutCancel(ctx) shutdownCtx, cancel := context.WithTimeout(shutdownCtx, p.shutdownTimeout) defer cancel() @@ -132,15 +191,16 @@ func (p *Processor[T]) worker(ctx context.Context) { for { select { case <-shutdownCtx.Done(): - log.InfoContext(shutdownCtx, "shutdown timeout expired") - return + event.AddStep(slog.LevelInfo, "shutdown timeout expired") + return event default: select { case job := <-jobChan: p.handler.Handle(shutdownCtx, job) + drainedJobs++ case <-shutdownCtx.Done(): - log.InfoContext(shutdownCtx, "shutdown timeout expired") - return + event.AddStep(slog.LevelInfo, "shutdown timeout expired") + return event } } } diff --git a/queue/processor_internal_test.go b/queue/processor_internal_test.go new file mode 100644 index 0000000..6a1b086 --- /dev/null +++ b/queue/processor_internal_test.go @@ -0,0 +1,104 @@ +package queue + +import ( + "context" + "testing" + "time" + + "github.com/platforma-dev/platforma/log" +) + +func TestNewProcessorRunEvent(t *testing.T) { + t.Parallel() + + event := newProcessorRunEvent(4, time.Second) + + if got := event.Name(); got != processorRunEventName { + t.Fatalf("expected event name %q, got %q", processorRunEventName, got) + } + + if got, ok := event.Attr("queue.workersAmount"); !ok || got != 4 { + t.Fatalf("expected queue.workersAmount attr, got %#v, exists=%v", got, ok) + } + + if got, ok := event.Attr("queue.shutdownTimeout"); !ok || got != time.Second { + t.Fatalf("expected queue.shutdownTimeout attr, got %#v, exists=%v", got, ok) + } +} + +func TestRunWorkerEvent(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + q := &testQueue[workerJob]{ + jobChan: make(chan workerJob, 1), + } + + q.jobChan <- workerJob{data: 1} + + processor := New(HandlerFunc[workerJob](func(_ context.Context, _ workerJob) { + cancel() + }), q, 1, time.Millisecond) + + event := processor.runWorker(ctx) + + if got := event.Name(); got != workerRunEventName { + t.Fatalf("expected event name %q, got %q", workerRunEventName, got) + } + + if got, ok := event.Attr("queue.processedJobs"); !ok || got != 1 { + t.Fatalf("expected queue.processedJobs attr, got %#v, exists=%v", got, ok) + } + + if got, ok := event.Attr("queue.drainedJobs"); !ok || got != 0 { + t.Fatalf("expected queue.drainedJobs attr, got %#v, exists=%v", got, ok) + } + + steps := workerEventSteps(t, event) + if len(steps) < 5 { + t.Fatalf("expected worker event steps, got %#v", steps) + } +} + +func workerEventSteps(t *testing.T, event *log.Event) []map[string]any { + t.Helper() + + for _, attr := range event.ToAttrs() { + if attr.Key == "steps" { + steps, ok := attr.Value.Any().([]map[string]any) + if !ok { + t.Fatalf("expected []map[string]any for steps, got %T", attr.Value.Any()) + } + + return steps + } + } + + return nil +} + +type workerJob struct { + data int +} + +type testQueue[T any] struct { + jobChan chan T +} + +func (q *testQueue[T]) Open(_ context.Context) error { + return nil +} + +func (q *testQueue[T]) Close(_ context.Context) error { + return nil +} + +func (q *testQueue[T]) EnqueueJob(_ context.Context, job T) error { + q.jobChan <- job + + return nil +} + +func (q *testQueue[T]) GetJobChan(_ context.Context) (chan T, error) { + return q.jobChan, nil +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index b0a8a2b..25ec8f8 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "log/slog" "time" "github.com/platforma-dev/platforma/application" @@ -16,6 +17,8 @@ import ( var errEmptyCronExpression = errors.New("cron expression cannot be empty") +const taskRunEventName = "scheduler.task.run" + const cronParseOptions = cron.Minute | cron.Hour | cron.Dom | @@ -77,16 +80,8 @@ func (s *Scheduler) Run(ctx context.Context) error { // Wrap runner to maintain consistent logging with trace IDs _, err := cronScheduler.AddFunc(s.cronExpr, func() { - runCtx := context.WithValue(ctx, log.TraceIDKey, uuid.NewString()) - log.InfoContext(runCtx, "scheduler task started") - - err := s.runner.Run(runCtx) - if err != nil { - log.ErrorContext(runCtx, "error in scheduler", "error", err) - return - } - - log.InfoContext(runCtx, "scheduler task finished") + runCtx, event := s.runTask(ctx) + log.WriteEvent(runCtx, event) }) if err != nil { return fmt.Errorf("failed to add cron task: %w", err) @@ -101,3 +96,24 @@ func (s *Scheduler) Run(ctx context.Context) error { return fmt.Errorf("scheduler context canceled: %w", ctx.Err()) } + +func (s *Scheduler) runTask(ctx context.Context) (context.Context, *log.Event) { + runCtx := context.WithValue(ctx, log.TraceIDKey, uuid.NewString()) + event := log.NewEvent(taskRunEventName) + event.AddAttrs(map[string]any{ + "scheduler.cronExpr": s.cronExpr, + }) + event.AddStep(slog.LevelInfo, "scheduler task started") + + err := s.runner.Run(runCtx) + if err != nil { + event.AddStep(slog.LevelError, "scheduler task failed") + event.AddError(fmt.Errorf("scheduler task failed: %w", err)) + + return runCtx, event + } + + event.AddStep(slog.LevelInfo, "scheduler task finished") + + return runCtx, event +} diff --git a/scheduler/scheduler_internal_test.go b/scheduler/scheduler_internal_test.go new file mode 100644 index 0000000..3e0b1d9 --- /dev/null +++ b/scheduler/scheduler_internal_test.go @@ -0,0 +1,131 @@ +package scheduler + +import ( + "context" + "errors" + "log/slog" + "testing" + + "github.com/platforma-dev/platforma/application" + "github.com/platforma-dev/platforma/log" +) + +func TestRunTaskSuccess(t *testing.T) { + t.Parallel() + + s, err := New("@hourly", application.RunnerFunc(func(ctx context.Context) error { + traceID, ok := ctx.Value(log.TraceIDKey).(string) + if !ok || traceID == "" { + t.Fatalf("expected trace ID in run context, got %#v", ctx.Value(log.TraceIDKey)) + } + + return nil + })) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + + runCtx, event := s.runTask(context.Background()) + + if traceID, ok := runCtx.Value(log.TraceIDKey).(string); !ok || traceID == "" { + t.Fatalf("expected trace ID in run context, got %#v", runCtx.Value(log.TraceIDKey)) + } + + if got := event.Name(); got != taskRunEventName { + t.Fatalf("expected event name %q, got %q", taskRunEventName, got) + } + + if got, ok := event.Attr("scheduler.cronExpr"); !ok || got != "@hourly" { + t.Fatalf("expected scheduler.cronExpr attr, got %#v, exists=%v", got, ok) + } + + if got := event.Level(); got != slog.LevelInfo { + t.Fatalf("expected event level %v, got %v", slog.LevelInfo, got) + } + + steps := eventSteps(t, event) + if len(steps) != 2 { + t.Fatalf("expected 2 steps, got %d", len(steps)) + } + + if steps[0]["name"] != "scheduler task started" { + t.Fatalf("expected first step to be start, got %#v", steps[0]) + } + + if steps[1]["name"] != "scheduler task finished" { + t.Fatalf("expected second step to be finish, got %#v", steps[1]) + } + + if errorsList := eventErrors(t, event); len(errorsList) != 0 { + t.Fatalf("expected no errors, got %#v", errorsList) + } +} + +func TestRunTaskError(t *testing.T) { + t.Parallel() + + s, err := New("@daily", application.RunnerFunc(func(_ context.Context) error { + return errors.New("boom") + })) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + + _, event := s.runTask(context.Background()) + + if got := event.Level(); got != slog.LevelError { + t.Fatalf("expected event level %v, got %v", slog.LevelError, got) + } + + steps := eventSteps(t, event) + if len(steps) != 2 { + t.Fatalf("expected 2 steps, got %d", len(steps)) + } + + if steps[1]["name"] != "scheduler task failed" { + t.Fatalf("expected failure step, got %#v", steps[1]) + } + + errorsList := eventErrors(t, event) + if len(errorsList) != 1 { + t.Fatalf("expected 1 error, got %d", len(errorsList)) + } + + if errorsList[0]["error"] != "scheduler task failed: boom" { + t.Fatalf("unexpected error payload: %#v", errorsList[0]) + } +} + +func eventSteps(t *testing.T, event *log.Event) []map[string]any { + t.Helper() + + for _, attr := range event.ToAttrs() { + if attr.Key == "steps" { + steps, ok := attr.Value.Any().([]map[string]any) + if !ok { + t.Fatalf("expected []map[string]any for steps, got %T", attr.Value.Any()) + } + + return steps + } + } + + return nil +} + +func eventErrors(t *testing.T, event *log.Event) []map[string]any { + t.Helper() + + for _, attr := range event.ToAttrs() { + if attr.Key == "errors" { + errorsList, ok := attr.Value.Any().([]map[string]any) + if !ok { + t.Fatalf("expected []map[string]any for errors, got %T", attr.Value.Any()) + } + + return errorsList + } + } + + return nil +} From 067e17e1ad0f5e051bb87c321a97b8ff163aca88 Mon Sep 17 00:00:00 2001 From: Denis Mishankov Date: Thu, 19 Mar 2026 19:37:24 +0300 Subject: [PATCH 2/8] Fix wide event logger default interface --- log/wideevent_logger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/log/wideevent_logger.go b/log/wideevent_logger.go index fef2503..2338e17 100644 --- a/log/wideevent_logger.go +++ b/log/wideevent_logger.go @@ -19,7 +19,7 @@ const ( simpleLogEventName = "log.record" ) -var _ logger = (*WideEventLogger)(nil) +var _ DefaultLogger = (*WideEventLogger)(nil) // NewWideEventLogger creates a wide-event logger. func NewWideEventLogger(w io.Writer, s Sampler, loggerType string, contextKeys map[string]any) *WideEventLogger { From 235864b25a7cb0464b17c8f3e179863339823cbe Mon Sep 17 00:00:00 2001 From: Denis Mishankov Date: Thu, 19 Mar 2026 20:05:57 +0300 Subject: [PATCH 3/8] Revert unrelated request helper changes --- auth/handler_delete_test.go | 10 +++++----- auth/middleware_test.go | 10 +++++----- httpserver/httpserver_test.go | 12 ++++++------ httpserver/recover_test.go | 9 ++++----- log/traceid_test.go | 3 +-- 5 files changed, 21 insertions(+), 23 deletions(-) diff --git a/auth/handler_delete_test.go b/auth/handler_delete_test.go index c98c21c..70ff484 100644 --- a/auth/handler_delete_test.go +++ b/auth/handler_delete_test.go @@ -18,7 +18,7 @@ func TestDeleteHandler_Success(t *testing.T) { } handler := auth.NewDeleteHandler(mockService) - req := httptest.NewRequestWithContext(context.Background(), http.MethodDelete, "/", nil) + req := httptest.NewRequest(http.MethodDelete, "/", nil) ctx := context.WithValue(req.Context(), auth.UserContextKey, &auth.User{ID: "user-id"}) req = req.WithContext(ctx) w := httptest.NewRecorder() @@ -42,7 +42,7 @@ func TestDeleteHandler_WrongMethod(t *testing.T) { mockService := &mockDeleteService{} handler := auth.NewDeleteHandler(mockService) - req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) + req := httptest.NewRequest(http.MethodGet, "/", nil) w := httptest.NewRecorder() handler.ServeHTTP(w, req) @@ -60,7 +60,7 @@ func TestDeleteHandler_UserNotFound(t *testing.T) { } handler := auth.NewDeleteHandler(mockService) - req := httptest.NewRequestWithContext(context.Background(), http.MethodDelete, "/", nil) + req := httptest.NewRequest(http.MethodDelete, "/", nil) w := httptest.NewRecorder() handler.ServeHTTP(w, req) @@ -78,7 +78,7 @@ func TestDeleteHandler_InternalError(t *testing.T) { } handler := auth.NewDeleteHandler(mockService) - req := httptest.NewRequestWithContext(context.Background(), http.MethodDelete, "/", nil) + req := httptest.NewRequest(http.MethodDelete, "/", nil) ctx := context.WithValue(req.Context(), auth.UserContextKey, &auth.User{ID: "user-id"}) req = req.WithContext(ctx) w := httptest.NewRecorder() @@ -98,7 +98,7 @@ func TestDeleteHandler_NoUserInContext(t *testing.T) { } handler := auth.NewDeleteHandler(mockService) - req := httptest.NewRequestWithContext(context.Background(), http.MethodDelete, "/", nil) + req := httptest.NewRequest(http.MethodDelete, "/", nil) w := httptest.NewRecorder() handler.ServeHTTP(w, req) diff --git a/auth/middleware_test.go b/auth/middleware_test.go index 7e3370f..f3073cd 100644 --- a/auth/middleware_test.go +++ b/auth/middleware_test.go @@ -25,7 +25,7 @@ func TestAuthenticationMiddleware_ValidSession(t *testing.T) { w.WriteHeader(http.StatusOK) })) - req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) + req := httptest.NewRequest(http.MethodGet, "/", nil) req.AddCookie(&http.Cookie{Name: "session", Value: "valid-session-id"}) w := httptest.NewRecorder() @@ -48,7 +48,7 @@ func TestAuthenticationMiddleware_NoSessionCookie(t *testing.T) { t.Fatal("handler should not be called when authentication fails") })) - req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) + req := httptest.NewRequest(http.MethodGet, "/", nil) w := httptest.NewRecorder() handler.ServeHTTP(w, req) @@ -71,7 +71,7 @@ func TestAuthenticationMiddleware_InvalidSession(t *testing.T) { t.Fatal("handler should not be called when authentication fails") })) - req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) + req := httptest.NewRequest(http.MethodGet, "/", nil) req.AddCookie(&http.Cookie{Name: "session", Value: "invalid-session-id"}) w := httptest.NewRecorder() @@ -95,7 +95,7 @@ func TestAuthenticationMiddleware_UserServiceError(t *testing.T) { t.Fatal("handler should not be called when authentication fails") })) - req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) + req := httptest.NewRequest(http.MethodGet, "/", nil) req.AddCookie(&http.Cookie{Name: "session", Value: "session-id"}) w := httptest.NewRecorder() @@ -119,7 +119,7 @@ func TestAuthenticationMiddleware_UserNotFound(t *testing.T) { t.Fatal("handler should not be called when authentication fails") })) - req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) + req := httptest.NewRequest(http.MethodGet, "/", nil) req.AddCookie(&http.Cookie{Name: "session", Value: "session-id"}) w := httptest.NewRecorder() diff --git a/httpserver/httpserver_test.go b/httpserver/httpserver_test.go index 7f54445..64bb356 100644 --- a/httpserver/httpserver_test.go +++ b/httpserver/httpserver_test.go @@ -22,7 +22,7 @@ func TestHTTPServer(t *testing.T) { w.Write([]byte("pong")) }) - r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/ping", nil) + r := httptest.NewRequest(http.MethodGet, "/ping", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) @@ -48,7 +48,7 @@ func TestHTTPServer(t *testing.T) { server.Handle("/ping", pingHandler) - r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/ping", nil) + r := httptest.NewRequest(http.MethodGet, "/ping", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) @@ -70,7 +70,7 @@ func TestHTTPServer(t *testing.T) { server := httpserver.New("", 0) server.HandleGroup("/hg", hg) - r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/hg/test", nil) + r := httptest.NewRequest(http.MethodGet, "/hg/test", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) @@ -113,7 +113,7 @@ func TestHTTPServer(t *testing.T) { server.Use(customMiddleware) server.Handle("/test", &handler{}) - r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) + r := httptest.NewRequest(http.MethodGet, "/test", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) @@ -143,7 +143,7 @@ func TestHTTPServer(t *testing.T) { server.UseFunc(customMiddlewareFunc) server.Handle("/test", &handler{}) - r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) + r := httptest.NewRequest(http.MethodGet, "/test", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) @@ -186,7 +186,7 @@ func TestHTTPServer(t *testing.T) { server.Use(firstMiddleware) server.UseFunc(secondMiddlewareFunc) - r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) + r := httptest.NewRequest(http.MethodGet, "/test", nil) w := httptest.NewRecorder() server.ServeHTTP(w, r) diff --git a/httpserver/recover_test.go b/httpserver/recover_test.go index ddd6c62..283bec7 100644 --- a/httpserver/recover_test.go +++ b/httpserver/recover_test.go @@ -1,7 +1,6 @@ package httpserver_test import ( - "context" "net/http" "net/http/httptest" "testing" @@ -35,7 +34,7 @@ func TestRecoverMiddleware_NormalOperation(t *testing.T) { wrappedHandler := middleware.Wrap(handler) // Create test request - req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) + req := httptest.NewRequest(http.MethodGet, "/test", nil) w := httptest.NewRecorder() // Execute @@ -62,7 +61,7 @@ func TestRecoverMiddleware_PanicRecovery(t *testing.T) { wrappedHandler := middleware.Wrap(handler) // Create test request - req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) + req := httptest.NewRequest(http.MethodGet, "/test", nil) w := httptest.NewRecorder() // Execute - this should not panic due to the recovery middleware @@ -90,7 +89,7 @@ func TestRecoverMiddleware_ErrorResponse(t *testing.T) { wrappedHandler := middleware.Wrap(handler) // Create test request - req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) + req := httptest.NewRequest(http.MethodGet, "/test", nil) w := httptest.NewRecorder() // Execute @@ -126,7 +125,7 @@ func TestRecoverMiddleware_MultiplePanics(t *testing.T) { // Test multiple requests to ensure middleware continues to work for i := range 3 { - req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/test", nil) + req := httptest.NewRequest(http.MethodGet, "/test", nil) w := httptest.NewRecorder() wrappedHandler.ServeHTTP(w, req) diff --git a/log/traceid_test.go b/log/traceid_test.go index 5fcae15..29263ff 100644 --- a/log/traceid_test.go +++ b/log/traceid_test.go @@ -1,7 +1,6 @@ package log_test import ( - "context" "net/http" "net/http/httptest" "testing" @@ -23,7 +22,7 @@ func TestTraceIDMiddleware(t *testing.T) { } })) - r := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/", nil) + r := httptest.NewRequest(http.MethodGet, "/", nil) w := httptest.NewRecorder() wrappedHandler.ServeHTTP(w, r) From 2ab288d72dee54ca253608b48712cbcfe757d48b Mon Sep 17 00:00:00 2001 From: Denis Mishankov Date: Thu, 19 Mar 2026 20:09:13 +0300 Subject: [PATCH 4/8] Revert logger interface rename --- log/log.go | 7 +++---- log/wideevent_logger.go | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/log/log.go b/log/log.go index 618cc9e..92d47d7 100644 --- a/log/log.go +++ b/log/log.go @@ -9,8 +9,7 @@ import ( "os" ) -// DefaultLogger is the interface required by package-level logging helpers. -type DefaultLogger interface { +type logger interface { Debug(msg string, args ...any) Info(msg string, args ...any) Warn(msg string, args ...any) @@ -27,10 +26,10 @@ type wideEventWriter interface { } // Logger is the default logger instance used by package-level logging functions. -var Logger DefaultLogger = New(os.Stdout, "text", slog.LevelInfo, nil) //nolint:gochecknoglobals +var Logger logger = New(os.Stdout, "text", slog.LevelInfo, nil) //nolint:gochecknoglobals // SetDefault sets the default logger used by the package-level logging functions. -func SetDefault(l DefaultLogger) { +func SetDefault(l logger) { Logger = l } diff --git a/log/wideevent_logger.go b/log/wideevent_logger.go index 2338e17..fef2503 100644 --- a/log/wideevent_logger.go +++ b/log/wideevent_logger.go @@ -19,7 +19,7 @@ const ( simpleLogEventName = "log.record" ) -var _ DefaultLogger = (*WideEventLogger)(nil) +var _ logger = (*WideEventLogger)(nil) // NewWideEventLogger creates a wide-event logger. func NewWideEventLogger(w io.Writer, s Sampler, loggerType string, contextKeys map[string]any) *WideEventLogger { From 92d0e5511d028eae2488150cebec5f331013fcf1 Mon Sep 17 00:00:00 2001 From: Denis Mishankov Date: Thu, 19 Mar 2026 20:10:11 +0300 Subject: [PATCH 5/8] remove internal tests --- queue/processor_internal_test.go | 104 --------------------- scheduler/scheduler_internal_test.go | 131 --------------------------- 2 files changed, 235 deletions(-) delete mode 100644 queue/processor_internal_test.go delete mode 100644 scheduler/scheduler_internal_test.go diff --git a/queue/processor_internal_test.go b/queue/processor_internal_test.go deleted file mode 100644 index 6a1b086..0000000 --- a/queue/processor_internal_test.go +++ /dev/null @@ -1,104 +0,0 @@ -package queue - -import ( - "context" - "testing" - "time" - - "github.com/platforma-dev/platforma/log" -) - -func TestNewProcessorRunEvent(t *testing.T) { - t.Parallel() - - event := newProcessorRunEvent(4, time.Second) - - if got := event.Name(); got != processorRunEventName { - t.Fatalf("expected event name %q, got %q", processorRunEventName, got) - } - - if got, ok := event.Attr("queue.workersAmount"); !ok || got != 4 { - t.Fatalf("expected queue.workersAmount attr, got %#v, exists=%v", got, ok) - } - - if got, ok := event.Attr("queue.shutdownTimeout"); !ok || got != time.Second { - t.Fatalf("expected queue.shutdownTimeout attr, got %#v, exists=%v", got, ok) - } -} - -func TestRunWorkerEvent(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - q := &testQueue[workerJob]{ - jobChan: make(chan workerJob, 1), - } - - q.jobChan <- workerJob{data: 1} - - processor := New(HandlerFunc[workerJob](func(_ context.Context, _ workerJob) { - cancel() - }), q, 1, time.Millisecond) - - event := processor.runWorker(ctx) - - if got := event.Name(); got != workerRunEventName { - t.Fatalf("expected event name %q, got %q", workerRunEventName, got) - } - - if got, ok := event.Attr("queue.processedJobs"); !ok || got != 1 { - t.Fatalf("expected queue.processedJobs attr, got %#v, exists=%v", got, ok) - } - - if got, ok := event.Attr("queue.drainedJobs"); !ok || got != 0 { - t.Fatalf("expected queue.drainedJobs attr, got %#v, exists=%v", got, ok) - } - - steps := workerEventSteps(t, event) - if len(steps) < 5 { - t.Fatalf("expected worker event steps, got %#v", steps) - } -} - -func workerEventSteps(t *testing.T, event *log.Event) []map[string]any { - t.Helper() - - for _, attr := range event.ToAttrs() { - if attr.Key == "steps" { - steps, ok := attr.Value.Any().([]map[string]any) - if !ok { - t.Fatalf("expected []map[string]any for steps, got %T", attr.Value.Any()) - } - - return steps - } - } - - return nil -} - -type workerJob struct { - data int -} - -type testQueue[T any] struct { - jobChan chan T -} - -func (q *testQueue[T]) Open(_ context.Context) error { - return nil -} - -func (q *testQueue[T]) Close(_ context.Context) error { - return nil -} - -func (q *testQueue[T]) EnqueueJob(_ context.Context, job T) error { - q.jobChan <- job - - return nil -} - -func (q *testQueue[T]) GetJobChan(_ context.Context) (chan T, error) { - return q.jobChan, nil -} diff --git a/scheduler/scheduler_internal_test.go b/scheduler/scheduler_internal_test.go deleted file mode 100644 index 3e0b1d9..0000000 --- a/scheduler/scheduler_internal_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package scheduler - -import ( - "context" - "errors" - "log/slog" - "testing" - - "github.com/platforma-dev/platforma/application" - "github.com/platforma-dev/platforma/log" -) - -func TestRunTaskSuccess(t *testing.T) { - t.Parallel() - - s, err := New("@hourly", application.RunnerFunc(func(ctx context.Context) error { - traceID, ok := ctx.Value(log.TraceIDKey).(string) - if !ok || traceID == "" { - t.Fatalf("expected trace ID in run context, got %#v", ctx.Value(log.TraceIDKey)) - } - - return nil - })) - if err != nil { - t.Fatalf("failed to create scheduler: %v", err) - } - - runCtx, event := s.runTask(context.Background()) - - if traceID, ok := runCtx.Value(log.TraceIDKey).(string); !ok || traceID == "" { - t.Fatalf("expected trace ID in run context, got %#v", runCtx.Value(log.TraceIDKey)) - } - - if got := event.Name(); got != taskRunEventName { - t.Fatalf("expected event name %q, got %q", taskRunEventName, got) - } - - if got, ok := event.Attr("scheduler.cronExpr"); !ok || got != "@hourly" { - t.Fatalf("expected scheduler.cronExpr attr, got %#v, exists=%v", got, ok) - } - - if got := event.Level(); got != slog.LevelInfo { - t.Fatalf("expected event level %v, got %v", slog.LevelInfo, got) - } - - steps := eventSteps(t, event) - if len(steps) != 2 { - t.Fatalf("expected 2 steps, got %d", len(steps)) - } - - if steps[0]["name"] != "scheduler task started" { - t.Fatalf("expected first step to be start, got %#v", steps[0]) - } - - if steps[1]["name"] != "scheduler task finished" { - t.Fatalf("expected second step to be finish, got %#v", steps[1]) - } - - if errorsList := eventErrors(t, event); len(errorsList) != 0 { - t.Fatalf("expected no errors, got %#v", errorsList) - } -} - -func TestRunTaskError(t *testing.T) { - t.Parallel() - - s, err := New("@daily", application.RunnerFunc(func(_ context.Context) error { - return errors.New("boom") - })) - if err != nil { - t.Fatalf("failed to create scheduler: %v", err) - } - - _, event := s.runTask(context.Background()) - - if got := event.Level(); got != slog.LevelError { - t.Fatalf("expected event level %v, got %v", slog.LevelError, got) - } - - steps := eventSteps(t, event) - if len(steps) != 2 { - t.Fatalf("expected 2 steps, got %d", len(steps)) - } - - if steps[1]["name"] != "scheduler task failed" { - t.Fatalf("expected failure step, got %#v", steps[1]) - } - - errorsList := eventErrors(t, event) - if len(errorsList) != 1 { - t.Fatalf("expected 1 error, got %d", len(errorsList)) - } - - if errorsList[0]["error"] != "scheduler task failed: boom" { - t.Fatalf("unexpected error payload: %#v", errorsList[0]) - } -} - -func eventSteps(t *testing.T, event *log.Event) []map[string]any { - t.Helper() - - for _, attr := range event.ToAttrs() { - if attr.Key == "steps" { - steps, ok := attr.Value.Any().([]map[string]any) - if !ok { - t.Fatalf("expected []map[string]any for steps, got %T", attr.Value.Any()) - } - - return steps - } - } - - return nil -} - -func eventErrors(t *testing.T, event *log.Event) []map[string]any { - t.Helper() - - for _, attr := range event.ToAttrs() { - if attr.Key == "errors" { - errorsList, ok := attr.Value.Any().([]map[string]any) - if !ok { - t.Fatalf("expected []map[string]any for errors, got %T", attr.Value.Any()) - } - - return errorsList - } - } - - return nil -} From af7ac16d292a407b7d51066b64c5f7a2f254d8ce Mon Sep 17 00:00:00 2001 From: Denis Mishankov Date: Thu, 19 Mar 2026 20:19:16 +0300 Subject: [PATCH 6/8] Update scheduler.go --- scheduler/scheduler.go | 36 +++++++++++------------------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 25ec8f8..8f5ffb1 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -17,8 +17,6 @@ import ( var errEmptyCronExpression = errors.New("cron expression cannot be empty") -const taskRunEventName = "scheduler.task.run" - const cronParseOptions = cron.Minute | cron.Hour | cron.Dom | @@ -80,8 +78,17 @@ func (s *Scheduler) Run(ctx context.Context) error { // Wrap runner to maintain consistent logging with trace IDs _, err := cronScheduler.AddFunc(s.cronExpr, func() { - runCtx, event := s.runTask(ctx) - log.WriteEvent(runCtx, event) + event := log.NewEvent("scheduler.task.run") + + runCtx := context.WithValue(ctx, log.TraceIDKey, uuid.NewString()) + runCtx = context.WithValue(runCtx, log.WideEventKey, event) + + event.AddStep(slog.LevelInfo, "scheduler task started") + + err := s.runner.Run(runCtx) + event.AddError(err) + + event.AddStep(slog.LevelInfo, "scheduler task finished") }) if err != nil { return fmt.Errorf("failed to add cron task: %w", err) @@ -96,24 +103,3 @@ func (s *Scheduler) Run(ctx context.Context) error { return fmt.Errorf("scheduler context canceled: %w", ctx.Err()) } - -func (s *Scheduler) runTask(ctx context.Context) (context.Context, *log.Event) { - runCtx := context.WithValue(ctx, log.TraceIDKey, uuid.NewString()) - event := log.NewEvent(taskRunEventName) - event.AddAttrs(map[string]any{ - "scheduler.cronExpr": s.cronExpr, - }) - event.AddStep(slog.LevelInfo, "scheduler task started") - - err := s.runner.Run(runCtx) - if err != nil { - event.AddStep(slog.LevelError, "scheduler task failed") - event.AddError(fmt.Errorf("scheduler task failed: %w", err)) - - return runCtx, event - } - - event.AddStep(slog.LevelInfo, "scheduler task finished") - - return runCtx, event -} From b978688f0bce28d4682edf3ef79f12ee1d784f0a Mon Sep 17 00:00:00 2001 From: Denis Mishankov Date: Thu, 19 Mar 2026 20:35:56 +0300 Subject: [PATCH 7/8] revert changes in log --- log/log.go | 41 ----------------------------------------- log/wideevent_logger.go | 2 -- 2 files changed, 43 deletions(-) diff --git a/log/log.go b/log/log.go index 92d47d7..4b8ccb5 100644 --- a/log/log.go +++ b/log/log.go @@ -21,10 +21,6 @@ type logger interface { ErrorContext(ctx context.Context, msg string, args ...any) } -type wideEventWriter interface { - WriteEvent(ctx context.Context, e *Event) -} - // Logger is the default logger instance used by package-level logging functions. var Logger logger = New(os.Stdout, "text", slog.LevelInfo, nil) //nolint:gochecknoglobals @@ -33,43 +29,6 @@ func SetDefault(l logger) { Logger = l } -// WriteEvent writes a finalized wide event when the default logger supports it. -func WriteEvent(ctx context.Context, e *Event) { - if e == nil { - return - } - - writer, ok := Logger.(wideEventWriter) - if ok { - writer.WriteEvent(ctx, e) - return - } - - e.Finish() - - args := eventArgs(e.ToAttrs()) - - switch level := e.Level(); { - case level >= slog.LevelError: - Logger.ErrorContext(ctx, e.Name(), args...) - case level >= slog.LevelWarn: - Logger.WarnContext(ctx, e.Name(), args...) - case level >= slog.LevelInfo: - Logger.InfoContext(ctx, e.Name(), args...) - default: - Logger.DebugContext(ctx, e.Name(), args...) - } -} - -func eventArgs(attrs []slog.Attr) []any { - args := make([]any, 0, len(attrs)) - for _, attr := range attrs { - args = append(args, attr) - } - - return args -} - type contextKey string const ( diff --git a/log/wideevent_logger.go b/log/wideevent_logger.go index fef2503..2350674 100644 --- a/log/wideevent_logger.go +++ b/log/wideevent_logger.go @@ -19,8 +19,6 @@ const ( simpleLogEventName = "log.record" ) -var _ logger = (*WideEventLogger)(nil) - // NewWideEventLogger creates a wide-event logger. func NewWideEventLogger(w io.Writer, s Sampler, loggerType string, contextKeys map[string]any) *WideEventLogger { // If no sampler provided, use a keep-all sampler to prevent nil panics From ef22c06a2e1337a9a49b897064d8575df1f093f3 Mon Sep 17 00:00:00 2001 From: Denis Mishankov Date: Thu, 19 Mar 2026 20:35:59 +0300 Subject: [PATCH 8/8] Update processor.go --- queue/processor.go | 84 +++++++--------------------------------------- 1 file changed, 12 insertions(+), 72 deletions(-) diff --git a/queue/processor.go b/queue/processor.go index b55e738..fa25012 100644 --- a/queue/processor.go +++ b/queue/processor.go @@ -2,9 +2,7 @@ package queue import ( "context" - "errors" "fmt" - "log/slog" "sync" "time" @@ -12,13 +10,6 @@ import ( "github.com/platforma-dev/platforma/log" ) -const ( - processorRunEventName = "queue.processor.run" - workerRunEventName = "queue.worker.run" -) - -var errWorkerPanicRecovered = errors.New("worker panic recovered") - // Handler defines the interface for processing jobs. type Handler[T any] interface { Handle(ctx context.Context, job T) @@ -65,19 +56,11 @@ func (p *Processor[T]) Enqueue(ctx context.Context, job T) error { // Run starts the queue processor and blocks until all workers complete. func (p *Processor[T]) Run(ctx context.Context) error { - runEvent := newProcessorRunEvent(p.workersAmount, p.shutdownTimeout) - defer log.WriteEvent(ctx, runEvent) - - runEvent.AddStep(slog.LevelInfo, "opening queue") - err := p.queue.Open(ctx) if err != nil { - runEvent.AddError(fmt.Errorf("failed to open queue: %w", err)) return fmt.Errorf("failed to open queue: %w", err) } - runEvent.AddStep(slog.LevelInfo, "starting workers") - p.wg.Add(p.workersAmount) for range p.workersAmount { workerCtx := context.WithValue(ctx, log.WorkerIDKey, uuid.NewString()) @@ -87,72 +70,33 @@ func (p *Processor[T]) Run(ctx context.Context) error { p.wg.Wait() - runEvent.AddStep(slog.LevelInfo, "all workers shut down") + log.InfoContext(ctx, "all workers shut down") err = p.queue.Close(ctx) if err != nil { - runEvent.AddError(fmt.Errorf("failed to close queue: %w", err)) return fmt.Errorf("failed to close queue: %w", err) } - runEvent.AddStep(slog.LevelInfo, "queue closed") - return nil } func (p *Processor[T]) worker(ctx context.Context) { defer p.wg.Done() - log.WriteEvent(ctx, p.runWorker(ctx)) -} - -func newProcessorRunEvent(workersAmount int, shutdownTimeout time.Duration) *log.Event { - event := log.NewEvent(processorRunEventName) - event.AddAttrs(map[string]any{ - "queue.workersAmount": workersAmount, - "queue.shutdownTimeout": shutdownTimeout, - }) - - return event -} - -func newWorkerRunEvent(shutdownTimeout time.Duration) *log.Event { - event := log.NewEvent(workerRunEventName) - event.AddAttrs(map[string]any{ - "queue.shutdownTimeout": shutdownTimeout, - }) - - return event -} - -func (p *Processor[T]) runWorker(ctx context.Context) (event *log.Event) { - event = newWorkerRunEvent(p.shutdownTimeout) - processedJobs := 0 - drainedJobs := 0 - - defer func() { - event.AddAttrs(map[string]any{ - "queue.processedJobs": processedJobs, - "queue.drainedJobs": drainedJobs, - }) - event.AddStep(slog.LevelInfo, "worker finished") - }() + defer log.InfoContext(ctx, "worker finished") defer func() { if r := recover(); r != nil { - event.AddStep(slog.LevelError, "worker panic recovered") - event.AddError(fmt.Errorf("%w: %v", errWorkerPanicRecovered, r)) + log.ErrorContext(ctx, "worker panic recovered", "panic", r) } }() - event.AddStep(slog.LevelInfo, "worker started") + log.InfoContext(ctx, "worker started") jobChan, err := p.queue.GetJobChan(ctx) if err != nil { - event.AddError(fmt.Errorf("failed to get job chan: %w", err)) - return event + log.ErrorContext(ctx, "failed to get job chan", "error", err) + return } - event.AddStep(slog.LevelInfo, "job channel opened") - // we first check for ctx.Done() in separate select statement // because select statements choose randomly if both cases are ready for { @@ -160,16 +104,15 @@ func (p *Processor[T]) runWorker(ctx context.Context) (event *log.Event) { select { case <-ctx.Done(): - event.AddStep(slog.LevelInfo, "shutdown requested") + log.InfoContext(ctx, "skipping job due to shutdown") breakLoop = true default: select { case job := <-jobChan: p.handler.Handle(ctx, job) - processedJobs++ case <-ctx.Done(): - event.AddStep(slog.LevelInfo, "shutdown requested") + log.InfoContext(ctx, "shutting down worker") breakLoop = true } } @@ -181,8 +124,6 @@ func (p *Processor[T]) runWorker(ctx context.Context) (event *log.Event) { // after context is cancelled we try to drain remaining jobs from channel // before shutdown time expired - event.AddStep(slog.LevelInfo, "draining remaining jobs") - shutdownCtx := context.WithoutCancel(ctx) shutdownCtx, cancel := context.WithTimeout(shutdownCtx, p.shutdownTimeout) defer cancel() @@ -191,16 +132,15 @@ func (p *Processor[T]) runWorker(ctx context.Context) (event *log.Event) { for { select { case <-shutdownCtx.Done(): - event.AddStep(slog.LevelInfo, "shutdown timeout expired") - return event + log.InfoContext(shutdownCtx, "shutdown timeout expired") + return default: select { case job := <-jobChan: p.handler.Handle(shutdownCtx, job) - drainedJobs++ case <-shutdownCtx.Done(): - event.AddStep(slog.LevelInfo, "shutdown timeout expired") - return event + log.InfoContext(shutdownCtx, "shutdown timeout expired") + return } } }