feat: persistent retrieval stats with batched flush#27
Conversation
- Create RetrievalStatsLogStore with bounded channel + background flusher - Batch inserts every 5s or 50 entries, non-blocking drop on full channel - Wire into recordRetrievalStatsExtended (search_request, context_injection, observations_served, stale_excluded, fresh_count, duplicates_removed) - Update handleGetRetrievalStats to aggregate from DB with since param, fall back to in-memory stats - Add 90-day cleanup for retrieval_stats_log in maintenance handler - Add retrievalStatsLogStore to reinitializeDatabase path - Update frontend RetrievalStatsResponse type with new fields
WalkthroughДобавлена постоянная база данных для сбора статистики извлечения (retrieval statistics) с фоновым сохранением событий в пакетах. Реализована логика очистки старых логов, интеграция с обработчиками запросов и обновлен API интерфейс для поддержки временных фильтров. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Handler
participant Service
participant Store as RetrievalStatsLogStore
participant DB as Database
Client->>Handler: GET /retrieval-stats?since=<time>
activate Handler
Handler->>Service: initMu.RLock()
Handler->>Service: retrievalStatsLogStore
Service-->>Handler: logStore reference
Handler->>Service: initMu.RUnlock()
alt Store Available
Handler->>Store: GetStats(ctx, project, since)
activate Store
Store->>DB: Query retrieval_stats_log<br/>GROUP BY event_type
DB-->>Store: Rows with sums
Store->>Store: Aggregate into<br/>AggregatedRetrievalStats
Store-->>Handler: Stats
deactivate Store
else Store Unavailable
Handler->>Service: GetRetrievalStats(project)<br/>(in-memory, no time filter)
Service-->>Handler: Stats
end
Handler-->>Client: JSON Response
deactivate Handler
rect rgba(100, 150, 200, 0.5)
Note over Service,DB: Background: Event Logging & Flushing
Service->>Store: LogEvent(project, eventType, count)<br/>(non-blocking channel)
Store->>Store: Buffer event in queue
Store->>Store: Flusher goroutine<br/>periodically flushes
Store->>DB: CreateInBatches(entries)
DB-->>Store: OK
end
rect rgba(100, 200, 100, 0.5)
Note over Service,DB: Background: Cleanup (90-day retention)
Service->>Store: Cleanup(ctx, 90*24*hour)
Store->>DB: DELETE FROM retrieval_stats_log<br/>WHERE created_at < cutoff
DB-->>Store: rows affected count
Store-->>Service: count, error
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Warning Review ran into problems🔥 ProblemsGit: Failed to clone repository. Please run the Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the analytics capabilities by introducing persistent storage for retrieval statistics. It transitions from an ephemeral in-memory system to a robust database-backed solution, featuring efficient batched writes and automated data cleanup. This change provides more reliable and comprehensive historical data for dashboard analytics, allowing for time-based querying and improved data retention. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
@coderabbitai review |
|
@gemini-code-assist review |
✅ Actions performedReview triggered.
|
|
@codex review |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
internal/worker/handlers_data.go (1)
438-443: Рассмотрите возврат ошибки для невалидного параметраsince.При невалидном формате даты (не RFC3339) ошибка парсинга игнорируется, и запрос возвращает статистику за всё время. Это может ввести пользователя в заблуждение.
Для консистентности с
handleGetSearchAnalytics(строки 524-530), где аналогичная ошибка возвращает HTTP 400, рекомендуется добавить валидацию:♻️ Предлагаемое исправление
if logStore != nil { var since time.Time if sinceStr := r.URL.Query().Get("since"); sinceStr != "" { - if t, err := time.Parse(time.RFC3339, sinceStr); err == nil { - since = t + t, err := time.Parse(time.RFC3339, sinceStr) + if err != nil { + http.Error(w, "invalid 'since' parameter: "+err.Error(), http.StatusBadRequest) + return } + since = t }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/worker/handlers_data.go` around lines 438 - 443, The handler currently swallows time.Parse errors for the "since" query (code using sinceStr := r.URL.Query().Get("since") and time.Parse(time.RFC3339, sinceStr)), causing invalid dates to be treated as "no since" and returning all-time stats; change this to validate the parse and return an HTTP 400 with a clear message when parsing fails (mirror the behavior in handleGetSearchAnalytics which checks the parse and responds with a 400), i.e., if time.Parse returns an error for sinceStr, call the same error response path instead of ignoring the error.internal/db/gorm/retrieval_stats_log_store.go (1)
101-106: Рассмотрите добавление таймаута для операций flush при завершении работы.Метод
flushвыполняет запись в БД без контекста с таймаутом. При медленной базе данных или сетевых проблемах операцияClose()может зависнуть на неопределённое время, блокируя graceful shutdown.♻️ Предлагаемое исправление
-func (s *RetrievalStatsLogStore) flush(entries []RetrievalStatsLogEntry) { - if err := s.db.CreateInBatches(entries, len(entries)).Error; err != nil { +func (s *RetrievalStatsLogStore) flush(entries []RetrievalStatsLogEntry) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := s.db.WithContext(ctx).CreateInBatches(entries, len(entries)).Error; err != nil { log.Warn().Err(err).Int("batch_size", len(entries)).Msg("failed to flush retrieval stats batch") } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/db/gorm/retrieval_stats_log_store.go` around lines 101 - 106, The flush method does DB writes without a timeout, risking Close() hanging; modify RetrievalStatsLogStore.flush to create a context with a timeout (e.g., context.WithTimeout(ctx, 5*time.Second)) and call s.db.WithContext(ctx).CreateInBatches(entries, len(entries)) so the DB operation is cancellable; also update whichever Close() or shutdown path calls flush to either pass through a parent context or use the same bounded timeout and ensure the cancel() is deferred to free resources; make the timeout a configurable constant or field (e.g., flushTimeout) on RetrievalStatsLogStore for testability.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/db/gorm/retrieval_stats_log_store.go`:
- Around line 101-106: The flush method does DB writes without a timeout,
risking Close() hanging; modify RetrievalStatsLogStore.flush to create a context
with a timeout (e.g., context.WithTimeout(ctx, 5*time.Second)) and call
s.db.WithContext(ctx).CreateInBatches(entries, len(entries)) so the DB operation
is cancellable; also update whichever Close() or shutdown path calls flush to
either pass through a parent context or use the same bounded timeout and ensure
the cancel() is deferred to free resources; make the timeout a configurable
constant or field (e.g., flushTimeout) on RetrievalStatsLogStore for
testability.
In `@internal/worker/handlers_data.go`:
- Around line 438-443: The handler currently swallows time.Parse errors for the
"since" query (code using sinceStr := r.URL.Query().Get("since") and
time.Parse(time.RFC3339, sinceStr)), causing invalid dates to be treated as "no
since" and returning all-time stats; change this to validate the parse and
return an HTTP 400 with a clear message when parsing fails (mirror the behavior
in handleGetSearchAnalytics which checks the parse and responds with a 400),
i.e., if time.Parse returns an error for sinceStr, call the same error response
path instead of ignoring the error.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 48b974ea-2fd8-4492-97bc-ee575d1707bf
📒 Files selected for processing (5)
internal/db/gorm/retrieval_stats_log_store.gointernal/worker/handlers_data.gointernal/worker/handlers_maintenance.gointernal/worker/service.goui/src/utils/api.ts
There was a problem hiding this comment.
Code Review
This pull request introduces a robust mechanism for persisting retrieval statistics using a batched flushing approach, which is an excellent design for performance. The integration into the existing service, including API updates, maintenance tasks, and a fallback to in-memory stats, is well-thought-out. My review provides suggestions to enhance maintainability by replacing magic values with constants, improving error handling for client-provided parameters, and simplifying redundant code.
| switch r.EventType { | ||
| case "search_request": | ||
| stats.SearchRequests = r.Total | ||
| stats.TotalRequests += r.Total | ||
| case "context_injection": | ||
| stats.ContextInjections = r.Total | ||
| stats.TotalRequests += r.Total | ||
| case "observations_served": | ||
| stats.ObservationsServed = r.Total | ||
| case "stale_excluded": | ||
| stats.StaleExcluded = r.Total | ||
| case "fresh_count": | ||
| stats.FreshCount = r.Total | ||
| case "duplicates_removed": | ||
| stats.DuplicatesRemoved = r.Total | ||
| } |
There was a problem hiding this comment.
The event type strings like "search_request" are hardcoded. This makes the code prone to typos and harder to maintain. It's a good practice to define these as constants and export them so they can be used consistently across packages. For example:
const (
RetrievalStatSearchRequest = "search_request"
RetrievalStatContextInjection = "context_injection"
// ... and so on for other event types
)These constants can then be used here and in internal/worker/service.go.
| if sinceStr := r.URL.Query().Get("since"); sinceStr != "" { | ||
| if t, err := time.Parse(time.RFC3339, sinceStr); err == nil { | ||
| since = t | ||
| } | ||
| } |
There was a problem hiding this comment.
If parsing the since timestamp fails, the error is silently ignored. This results in falling back to all-time stats, which can be confusing for the user who intended to filter by a time range. It would be better to log a warning when parsing fails to make this behavior explicit and aid in debugging client-side issues.
| if sinceStr := r.URL.Query().Get("since"); sinceStr != "" { | |
| if t, err := time.Parse(time.RFC3339, sinceStr); err == nil { | |
| since = t | |
| } | |
| } | |
| if t, err := time.Parse(time.RFC3339, sinceStr); err != nil { | |
| log.Warn().Err(err).Str("since", sinceStr).Msg("invalid 'since' parameter format, ignoring") | |
| } else { | |
| since = t | |
| } |
| if rsStore == nil { | ||
| return | ||
| } | ||
| deleted, err := rsStore.Cleanup(context.Background(), 90*24*time.Hour) |
| if logStore != nil { | ||
| if isSearch { | ||
| logStore.LogEvent(project, "search_request", 1) | ||
| } else { | ||
| logStore.LogEvent(project, "context_injection", 1) | ||
| } | ||
| if served > 0 { | ||
| logStore.LogEvent(project, "observations_served", int(served)) | ||
| } | ||
| if staleExcluded > 0 { | ||
| logStore.LogEvent(project, "stale_excluded", int(staleExcluded)) | ||
| } | ||
| if freshCount > 0 { | ||
| logStore.LogEvent(project, "fresh_count", int(freshCount)) | ||
| } | ||
| if duplicatesRemoved > 0 { | ||
| logStore.LogEvent(project, "duplicates_removed", int(duplicatesRemoved)) | ||
| } | ||
| } |
There was a problem hiding this comment.
This block can be improved in two ways:
- The event type strings are hardcoded. It's better to use constants (as suggested in
retrieval_stats_log_store.go) for consistency and maintainability. - The
if ... > 0checks are redundant becauseLogEventalready handlescount <= 0.
By addressing both, the code becomes cleaner and more robust.
if logStore != nil {
if isSearch {
logStore.LogEvent(project, gorm.RetrievalStatSearchRequest, 1)
} else {
logStore.LogEvent(project, gorm.RetrievalStatContextInjection, 1)
}
logStore.LogEvent(project, gorm.RetrievalStatObservationsServed, int(served))
logStore.LogEvent(project, gorm.RetrievalStatStaleExcluded, int(staleExcluded))
logStore.LogEvent(project, gorm.RetrievalStatFreshCount, int(freshCount))
logStore.LogEvent(project, gorm.RetrievalStatDuplicatesRemoved, int(duplicatesRemoved))
}There was a problem hiding this comment.
Code Review
This pull request introduces a robust mechanism for persisting retrieval statistics using a batched background flusher, which is a significant enhancement for analytics. The implementation is solid, with a new API endpoint for querying time-ranged stats and a corresponding cleanup job. My review includes suggestions to further improve maintainability and API robustness by using constants for shared string literals, refactoring for code clarity, and adding stricter validation for API parameters.
| batch := make([]RetrievalStatsLogEntry, 0, retrievalStatsFlushSize) | ||
|
|
||
| for { | ||
| select { | ||
| case entry, ok := <-s.ch: | ||
| if !ok { | ||
| // Channel closed — flush remaining and exit. | ||
| if len(batch) > 0 { | ||
| s.flush(batch) | ||
| } | ||
| return | ||
| } | ||
| batch = append(batch, entry) | ||
| if len(batch) >= retrievalStatsFlushSize { | ||
| s.flush(batch) | ||
| batch = batch[:0] | ||
| } | ||
| case <-ticker.C: | ||
| if len(batch) > 0 { | ||
| s.flush(batch) | ||
| batch = batch[:0] | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The flusher function has some duplicated code for flushing the batch. You can refactor this to improve maintainability by extracting the flushing logic into a local helper function.
batch := make([]RetrievalStatsLogEntry, 0, retrievalStatsFlushSize)
flushBatch := func() {
if len(batch) > 0 {
s.flush(batch)
batch = batch[:0]
}
}
for {
select {
case entry, ok := <-s.ch:
if !ok {
// Channel closed — flush remaining and exit.
flushBatch()
return
}
batch = append(batch, entry)
if len(batch) >= retrievalStatsFlushSize {
flushBatch()
}
case <-ticker.C:
flushBatch()
}
}| switch r.EventType { | ||
| case "search_request": | ||
| stats.SearchRequests = r.Total | ||
| stats.TotalRequests += r.Total | ||
| case "context_injection": | ||
| stats.ContextInjections = r.Total | ||
| stats.TotalRequests += r.Total | ||
| case "observations_served": | ||
| stats.ObservationsServed = r.Total | ||
| case "stale_excluded": | ||
| stats.StaleExcluded = r.Total | ||
| case "fresh_count": | ||
| stats.FreshCount = r.Total | ||
| case "duplicates_removed": | ||
| stats.DuplicatesRemoved = r.Total | ||
| } |
There was a problem hiding this comment.
The event type strings like "search_request" are hardcoded here and also in internal/worker/service.go. This is prone to typos and makes maintenance harder. It would be better to define these as exported constants in this package and use them consistently.
Example:
const (
EventTypeSearchRequest = "search_request"
EventTypeContextInjection = "context_injection"
// ... and so on for other event types
)| if sinceStr := r.URL.Query().Get("since"); sinceStr != "" { | ||
| if t, err := time.Parse(time.RFC3339, sinceStr); err == nil { | ||
| since = t | ||
| } | ||
| } |
There was a problem hiding this comment.
If parsing the since parameter fails, the error is silently ignored, and the endpoint falls back to returning all-time stats. This could be confusing for API users who provide an invalid timestamp and expect an error or a filtered response. It's better to return a 400 Bad Request error to provide clear feedback.
if sinceStr := r.URL.Query().Get("since"); sinceStr != "" {
t, err := time.Parse(time.RFC3339, sinceStr)
if err != nil {
http.Error(w, "invalid 'since' parameter format, must be RFC3339", http.StatusBadRequest)
return
}
since = t
}| if rsStore == nil { | ||
| return | ||
| } | ||
| deleted, err := rsStore.Cleanup(context.Background(), 90*24*time.Hour) |
There was a problem hiding this comment.
Summary
RetrievalStatsLogStorewith bounded channel + background batched flusherrecordRetrievalStatsExtended/api/stats/retrievalto aggregate from DB withsinceparameterretrieval_stats_logtableDetails
Close()drains remaining entriesRetrievalStatsResponsetype extended with new fieldsTest plan
/api/stats/retrieval?since=<ISO8601>and verify aggregated dataSummary by CodeRabbit
Релиз