feat(async_search): gen proto#4
Conversation
9b379f5 to
c598c4b
Compare
📝 WalkthroughWalkthroughIntroduce a persistent, quota- and retention-aware async-search subsystem: proto endpoints expanded/renamed (start/fetch/cancel/delete/list), richer request/response schema (retention, status enum, timestamps, progress, disk usage), on-disk QPR encoding/merge, multi-worker processing, maintenance loop, and broad proxy/store/grpc/mapping/marshaler/test updates. Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested reviewers
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (1)
api/seqproxyapi/v1/seq_proxy_api.proto (1)
269-280: AddAsyncSearchStatusPendingto reflect queued searches
[dKharms already raised the same point]
🧹 Nitpick comments (4)
api/storeapi/store_api.proto (2)
160-165: Consider adding a PENDING/QUEUED state to AsyncSearchStatusThe lifecycle currently jumps from
InProgress→Done/Cancelled/Error. A distinctPending(orQueued) enum value helps front-ends distinguish “accepted but not started” from “actively running”. This was requested in previous discussions inside the proxy API file; mirroring it here keeps both layers symmetrical.
167-179: Progress metrics diverge from proxy API — unify to avoid translation glitchesStore layer returns
fracs_done/fracs_queueasuint64, whereas the proxy layer (see seq_proxy_api.proto Lines 288-290) exposes a singledouble progress. The gateway now needs bespoke conversion logic and loses precision on large totals. Aligning on one representation (e.g.,double progressor{done,total}ints) will simplify marshaling and reduce the chance of rounding/overflow bugs.api/seqproxyapi/v1/seq_proxy_api.proto (2)
282-292: Mismatch with store-layer progress countersProxy response exposes
double progress, whereas the store response uses integerfracs_done/fracs_queue. Decide on a single format (preferably percentage asdouble progressfor human-readability) or document the conversion rules in the gateway.
237-250: Retention is mandatory but undocumented default — clarify or make optional
retentionis required (field 1). If a client omits it, should the request be rejected or should the server fall back to a sensible default (e.g., 24 h)? Either document the default explicitly in comments or mark the fieldoptionalto convey that the server may apply a policy value.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (6)
pkg/seqproxyapi/v1/seq_proxy_api.pb.gois excluded by!**/*.pb.gopkg/seqproxyapi/v1/seq_proxy_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/storeapi/store_api_vtproto.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (2)
api/seqproxyapi/v1/seq_proxy_api.proto(4 hunks)api/storeapi/store_api.proto(2 hunks)
🔇 Additional comments (1)
api/storeapi/store_api.proto (1)
140-149: Consider makingsearch_idserver-generated to avoid collisions
Thesearch_idfield is currently required in StartAsyncSearchRequest (api/storeapi/store_api.proto:141) and directly drives request routing and storage in:
- storeapi/grpc_async_search.go (
r.SearchId)- storeapi/client.go (inMemoryAPIClient) and its mocks (
store_api_client_mock.go)- proxy HTTP paths in seq_proxy_api.proto and generated handlers (
/async-searches/{search_id})Switching to a server-generated ID will be a breaking API change. You’ll need to:
• Return the new ID in StartAsyncSearchResponse
• Update all client, mock, proxy and in-memory implementations to consume the returned ID
• Adjust gRPC and HTTP routing to use the generated ID rather than expect it on the requestPlease review every usage of
search_idacross the codebase before proceeding with this refactor.
c598c4b to
7168308
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
api/storeapi/store_api.proto (1)
160-165: MissingAsyncSearchStatusPending– already raised earlierSame feedback as in the proxy schema: a
Pendingstate can be handy for queued tasks.api/seqproxyapi/v1/seq_proxy_api.proto (1)
269-280: Pending status againSee earlier remark – a
Pendingenum could future-proof scheduling.
🧹 Nitpick comments (7)
api/storeapi/store_api.proto (4)
20-23: Prefer re-usinggoogle.protobuf.Emptyfor empty responses
CancelAsyncSearchResponseandDeleteAsyncSearchResponsecarry no payload. You could simplify the API surface by re-usinggoogle.protobuf.Emptyinstead of introducing two extra empty messages.- rpc CancelAsyncSearch(CancelAsyncSearchRequest) returns (CancelAsyncSearchResponse) {} - rpc DeleteAsyncSearch(DeleteAsyncSearchRequest) returns (DeleteAsyncSearchResponse) {} + rpc CancelAsyncSearch(CancelAsyncSearchRequest) returns (google.protobuf.Empty) {} + rpc DeleteAsyncSearch(DeleteAsyncSearchRequest) returns (google.protobuf.Empty) {}
153-158: Document default pagination behaviour
size/offsetbecame required ints but no defaults or range checks are described.
Please add a short comment (orgoogle.api.field_behavior) clarifying defaults and max limits to avoid ambiguous implementations.
167-179: Progress terminology differs from proxy APIStore API exports
fracs_done/fracs_queueas uint64 counters, whereas the proxy API exposes a singleprogressdouble in[0,1].
Down-stream services now need bespoke conversion logic. Consider aligning on one representation (preferably a double 0-1 for simplicity).
181-192: Consistency: empty request/response names add noise
CancelAsyncSearchRequest/DeleteAsyncSearchRequestonly wrapsearch_id.
If you keep dedicated empty responses, consider inlining the ID into the RPC path and dropping the wrappers, e.g.:rpc CancelAsyncSearch(string) returns (google.protobuf.Empty);…or keep the wrapper but at least share a single
SearchIdmessage across RPCs.api/seqproxyapi/v1/seq_proxy_api.proto (3)
238-250:with_docsflag: highlight storage impactGreat that the comment warns about disk usage.
Consider adding a concrete soft/hard limit or quota mechanism to prevent unbounded growth.
282-292: Progress granularity
double progress(0-1) is straightforward. Make clear whether 1.0 means all shards successful or any terminal state. Add a note.
300-304: Empty request wrapperSame as store API: consider a shared
SearchIdmessage or direct path param without wrapper.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (6)
pkg/seqproxyapi/v1/seq_proxy_api.pb.gois excluded by!**/*.pb.gopkg/seqproxyapi/v1/seq_proxy_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/storeapi/store_api_vtproto.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (2)
api/seqproxyapi/v1/seq_proxy_api.proto(4 hunks)api/storeapi/store_api.proto(2 hunks)
🔇 Additional comments (5)
api/seqproxyapi/v1/seq_proxy_api.proto (5)
79-82: Pluralised endpoint looks goodSwitching to
/async-searchesaligns with REST conventions and avoids a future breaking change.
88-90: GET on item resource is clear – niceFetching via
GET /async-searches/{search_id}matches HTTP semantics.
95-97: Why POST for cancel instead of DELETE?Cancelling is state-destructive and idempotent →
DELETE /async-searches/{id}would be the canonical verb.
Any strong reason to usePOST …/cancel? If not, consider reverting toDELETE.
100-104: HTTP mapping for delete lacksbody: "*"– goodNo body is expected and the verb is correct. LGTM.
253-255: Response now returnssearch_id– consistent and user-friendly👍
7168308 to
52f2e68
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4 +/- ##
==========================================
- Coverage 73.44% 72.34% -1.11%
==========================================
Files 193 195 +2
Lines 16015 17350 +1335
==========================================
+ Hits 11762 12551 +789
- Misses 3662 4115 +453
- Partials 591 684 +93 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
storeapi/grpc_v1.go (1)
24-26: Type mismatch: passing storeapi.MappingProvider to fracmanager.MustStartAsync which expects fracmanager.MappingProviderThese are distinct interface types; this won’t compile. Use a type alias or pass the fracmanager interface explicitly.
Minimal, future-proof fix via alias:
- type MappingProvider interface { - GetMapping() seq.Mapping - } +// Re-export fracmanager.MappingProvider to avoid interface drift and enable direct use. +type MappingProvider = fracmanager.MappingProviderAlternatively, do a type assertion at the call site (less ideal due to runtime check):
- asyncSearcher: fracmanager.MustStartAsync(cfg.Search.Async, mappingProvider, fracManager.GetAllFracs()), + asyncSearcher: fracmanager.MustStartAsync(cfg.Search.Async, mappingProvider.(fracmanager.MappingProvider), fracManager.GetAllFracs()),Also ensure asyncSearcher is stopped on server shutdown if AsyncSearcher exposes Stop/Close.
Also applies to: 114-115
♻️ Duplicate comments (4)
api/storeapi/store_api.proto (1)
142-151: Asymmetry: client-supplied search_id; empty response — prefer server-generated IDs for safety and consistencyThis repeats prior feedback: generating IDs server-side avoids collisions/guessing and aligns with the proxy API.
Apply this proto change to shift ID generation to the server:
message StartAsyncSearchRequest { - string search_id = 1; google.protobuf.Duration retention = 2; string query = 3; int64 from = 4; int64 to = 5; repeated AggQuery aggs = 6; int64 histogram_interval = 7; bool with_docs = 8; } -message StartAsyncSearchResponse {} +message StartAsyncSearchResponse { + string search_id = 1; +}proxy/search/async.go (1)
423-436: Fix the status weight mapping issue.Same issue as in
fracmanager/encoding.go- the slice is being used incorrectly as a map.api/seqproxyapi/v1/seq_proxy_api.proto (2)
278-289: Consider adding AsyncSearchStatusPending status.As suggested in the past review comment, a pending status could be useful for future scheduler implementations.
268-276: Validate pagination parameters when with_docs is false.The comments indicate these parameters are ignored when
with_docsis false, but there's no validation to reject non-zero values. This could confuse clients.
🧹 Nitpick comments (19)
config/config.go (1)
218-222: Add a simple invariant check between MaxTotalSize and MaxSizePerRequestNice additions. Consider validating these two to avoid footguns in prod configs (e.g., per-request limit greater than total budget).
Example (place near the other computed defaults in Parse):
// After setting AsyncSearch defaults if c.AsyncSearch.MaxSizePerRequest > c.AsyncSearch.MaxTotalSize { return Config{}, fmt.Errorf("async_search.max_size_per_request (%s) must be <= async_search.max_total_size (%s)", units.Base2Bytes(c.AsyncSearch.MaxSizePerRequest), units.Base2Bytes(c.AsyncSearch.MaxTotalSize)) }api/storeapi/store_api.proto (4)
144-151: Type inconsistency: from/to are int64 here but Timestamps in responsesStart request uses int64, while FetchAsyncSearchResultResponse and AsyncSearchesListItem use google.protobuf.Timestamp for from/to. Consider unifying to Timestamp for symmetry and easier client handling.
157-160: size/offset downgraded to int32 — confirm limits or use int64 for consistencyOther requests use int64 for size/offset. If you don’t have a strict upper bound, prefer int64 to avoid accidental truncation.
Possible tweak:
message FetchAsyncSearchResultRequest { string search_id = 1; - int32 size = 2; - int32 offset = 3; + int64 size = 2; + int64 offset = 3; Order order = 4; }
162-167: Enum naming style deviates from file conventionsElsewhere enums use SCREAMING_SNAKE_CASE (e.g., NO_ERROR). Consider switching to ASYNC_SEARCH_STATUS_IN_PROGRESS, etc., for consistency. Not blocking.
210-228: Repeat: remove ‘optional’ on message Timestamp; consider unifying typesSame ‘optional’ concern for canceled_at here. Also, consider aligning from/to types with Start request.
Apply:
message AsyncSearchesListItem { string search_id = 1; AsyncSearchStatus status = 2; google.protobuf.Timestamp started_at = 3; google.protobuf.Timestamp expires_at = 4; - optional google.protobuf.Timestamp canceled_at = 5; + google.protobuf.Timestamp canceled_at = 5; uint64 fracs_done = 6; uint64 fracs_queue = 7; uint64 disk_usage = 8; repeated AggQuery aggs = 9; int64 histogram_interval = 10; string query = 11; google.protobuf.Timestamp from = 12; google.protobuf.Timestamp to = 13; google.protobuf.Duration retention = 14; bool with_docs = 15; }proxy/search/ingestor.go (1)
197-210: Minor: avoid keeping backing array when offset exceeds lengthids = ids[:0] retains capacity. Returning nil helps GC and communicates emptiness more clearly.
func paginateIDs(ids seq.IDSources, offset, size int) (seq.IDSources, int) { if len(ids) > offset { ids = ids[offset:] } else { - ids = ids[:0] + ids = nil } if len(ids) > size { ids = ids[:size] } else { size = len(ids) } return ids, size }util/bufferpool.go (1)
13-24: Add Pool.New factory and simplify Get; optionally expose a constructorUsing sync.Pool.New avoids the nil-branch and centralizes allocation.
type BufferPool struct { pool sync.Pool } -func (q *BufferPool) Get() *bytespool.Buffer { - v := q.pool.Get() - if v != nil { - return v.(*bytespool.Buffer) - } - return new(bytespool.Buffer) -} +func NewBufferPool() *BufferPool { + return &BufferPool{ + pool: sync.Pool{ + New: func() any { return new(bytespool.Buffer) }, + }, + } +} + +func (q *BufferPool) Get() *bytespool.Buffer { + return q.pool.Get().(*bytespool.Buffer) +} func (q *BufferPool) Put(b *bytespool.Buffer) { b.Reset() q.pool.Put(b) }pkg/seqproxyapi/v1/marshaler_test.go (1)
102-146: Strengthen JSON comparison to avoid brittle failures due to field orderRelying on exact string equality for complex JSON makes the test fragile. Use JSONEq to compare semantically.
- r.Equal(expected, string(raw)) + require.JSONEq(t, expected, string(raw))fracmanager/async_searcher_test.go (1)
49-63: Ensure AsyncSearcher is shut down to avoid goroutine leaksMustStartAsync spawns a maintenance goroutine; this test waits only for processWg. If AsyncSearcher exposes a shutdown/close method, invoke it via t.Cleanup to prevent leaks.
fracmanager/encoding_test.go (1)
88-92: Make randomized heavy loop test-friendly under -shortRunning up to 100 iterations with up to ~8K entries can be slow. Respect testing.Short() to keep CI fast.
- for i := 0; i < 100; i++ { + n := 100 + if testing.Short() { + n = 5 + } + for i := 0; i < n; i++ { r := rand.N(8) qpr := getRandomQPR(r * 1024) test(qpr) }pkg/seqproxyapi/v1/marshaler.go (1)
231-237: Inconsistent wire format: DiskUsage quoted here, numeric elsewhere.FetchAsyncSearchResultResponse encodes disk_usage as a JSON string, while AsyncSearchesListItem (and most protojson outputs) encode it as a number. This asymmetry is likely to surprise clients.
Consider making disk_usage consistently numeric across endpoints:
- DiskUsage: json.RawMessage(strconv.Quote(strconv.FormatUint(r.DiskUsage, 10))), + DiskUsage: json.RawMessage(strconv.FormatUint(r.DiskUsage, 10)),Alternatively, switch AsyncSearchesListItem.MarshalJSON to the same string encoding if that’s the intended contract.
pkg/seqproxyapi/v1/mappings.go (2)
86-91: Add lower-bound validation for robustness.Other converters in this file (e.g., ToAggFunc) also check for negative values. Mirror that here to avoid panics on malformed inputs.
-func (s AsyncSearchStatus) ToAsyncSearchStatus() (fracmanager.AsyncSearchStatus, error) { - if int(s) >= len(statusMappingsPb) { +func (s AsyncSearchStatus) ToAsyncSearchStatus() (fracmanager.AsyncSearchStatus, error) { + if s < 0 || int(s) >= len(statusMappingsPb) { return 0, fmt.Errorf("unknown status") } return statusMappingsPb[s], nil }
116-129: String-to-status map looks fine; consider documenting accepted values.The accepted keys are the exact proto names. A short comment noting case-sensitivity and expected values helps avoid misuse.
tests/integration_tests/integration_test.go (1)
2021-2022: Fragile equality on time-bearing request structs.r.Equal(startReq, fresp.Request) may intermittently fail due to monotonic clock bits in time.Time. Prefer field-wise comparison and use Equal/WithinDuration for From/To.
- r.Equal(startReq, fresp.Request) + r.Equal(startReq.Query, fresp.Request.Query) + r.Equal(startReq.Retention, fresp.Request.Retention) + r.Equal(startReq.WithDocs, fresp.Request.WithDocs) + r.Equal(startReq.Aggregations, fresp.Request.Aggregations) + r.Equal(startReq.HistogramInterval, fresp.Request.HistogramInterval) + r.Truef(fresp.Request.From.Equal(startReq.From) && fresp.Request.To.Equal(startReq.To), + "request From/To differ: from=%v vs %v; to=%v vs %v", + startReq.From, fresp.Request.From, startReq.To, fresp.Request.To)storeapi/grpc_async_search.go (3)
18-31: Clarify intent: WithTotal derived from WithDocs may be surprising.Coupling WithTotal to WithDocs is non-obvious and possibly costly (computing totals is expensive). If the goal is only to persist docs for later fetch, consider decoupling these flags and document the trade-offs.
Also applies to: 32-41
57-66: Validate pagination inputs and guard against overflow.Negative size/offset or overflow in size+offset can lead to incorrect limits. Fail fast with InvalidArgument and compute limit safely.
func (g *GrpcV1) FetchAsyncSearchResult( _ context.Context, r *storeapi.FetchAsyncSearchResultRequest, ) (*storeapi.FetchAsyncSearchResultResponse, error) { + if r.Size < 0 || r.Offset < 0 { + return nil, status.Error(codes.InvalidArgument, "size and offset must be non-negative") + } + // Compute safe limit for pagination + limit := int(r.Size) + int(r.Offset) + if limit < 0 { + limit = math.MaxInt // overflow guard + } fr, exists := g.asyncSearcher.FetchSearchResult(fracmanager.FetchSearchResultRequest{ ID: r.SearchId, - Limit: int(r.Size + r.Offset), + Limit: limit, Order: r.Order.MustDocsOrder(), })
112-130: GetAsyncSearchesList: LGTM; minor nit on panic safety.MustAsyncSearchStatus will panic on unknown values. Consider using the error-returning variant and mapping to InvalidArgument instead.
proxyapi/grpc_async_search.go (1)
155-166: Consider caching the conversion logic.The
makeProtoRequestAggregationsfunction creates a new slice on every call. Since aggregation definitions are likely to be reused, consider whether caching could be beneficial for performance.proxy/search/async.go (1)
54-55: Verify the TODO comment about WithDocs.The comment indicates that
WithDocsis hardcoded tofalsedespite being passed in the request. This seems like a significant limitation that should be tracked.Would you like me to create an issue to track enabling
WithDocsafter implementing async searches' QPRs merging in batches?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (6)
pkg/seqproxyapi/v1/seq_proxy_api.pb.gois excluded by!**/*.pb.gopkg/seqproxyapi/v1/seq_proxy_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/storeapi/store_api_vtproto.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (27)
api/seqproxyapi/v1/seq_proxy_api.proto(4 hunks)api/storeapi/store_api.proto(2 hunks)cmd/seq-db/seq-db.go(1 hunks)config/config.go(1 hunks)frac/meta_data_collector.go(1 hunks)frac/processor/search_params.go(1 hunks)fracmanager/async_searcher.go(6 hunks)fracmanager/async_searcher_test.go(1 hunks)fracmanager/encoding.go(1 hunks)fracmanager/encoding_test.go(1 hunks)pkg/seqproxyapi/v1/mappings.go(2 hunks)pkg/seqproxyapi/v1/marshaler.go(1 hunks)pkg/seqproxyapi/v1/marshaler_test.go(1 hunks)pkg/storeapi/mappings.go(2 hunks)proxy/search/async.go(4 hunks)proxy/search/ingestor.go(2 hunks)proxy/search/mock/store_api_client_mock.go(2 hunks)proxyapi/grpc_async_search.go(3 hunks)proxyapi/grpc_v1.go(1 hunks)proxyapi/ingestor.go(1 hunks)proxyapi/mock/grpc_v1.go(3 hunks)seq/qpr.go(1 hunks)storeapi/client.go(1 hunks)storeapi/grpc_async_search.go(3 hunks)storeapi/grpc_v1.go(1 hunks)tests/integration_tests/integration_test.go(4 hunks)util/bufferpool.go(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- frac/meta_data_collector.go
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: forshev
PR: ozontech/seq-db#3
File: api/seqproxyapi/v1/seq_proxy_api.proto:88-90
Timestamp: 2025-08-05T10:52:04.737Z
Learning: In the seq-db project, the FetchAsyncSearchResult RPC method in api/seqproxyapi/v1/seq_proxy_api.proto was intentionally changed from HTTP GET to POST during development. This breaking change is acceptable because the async search feature is under active development and not yet in production use.
📚 Learning: 2025-08-05T10:52:04.737Z
Learnt from: forshev
PR: ozontech/seq-db#3
File: api/seqproxyapi/v1/seq_proxy_api.proto:88-90
Timestamp: 2025-08-05T10:52:04.737Z
Learning: In the seq-db project, the FetchAsyncSearchResult RPC method in api/seqproxyapi/v1/seq_proxy_api.proto was intentionally changed from HTTP GET to POST during development. This breaking change is acceptable because the async search feature is under active development and not yet in production use.
Applied to files:
proxyapi/grpc_v1.gopkg/seqproxyapi/v1/marshaler.goapi/seqproxyapi/v1/seq_proxy_api.protostoreapi/grpc_async_search.goapi/storeapi/store_api.protofracmanager/async_searcher.goproxy/search/async.goproxyapi/grpc_async_search.go
📚 Learning: 2025-08-05T10:51:10.548Z
Learnt from: forshev
PR: ozontech/seq-db#3
File: proxy/search/async.go:155-171
Timestamp: 2025-08-05T10:51:10.548Z
Learning: In proxy/search/async.go, the FetchAsyncSearchResult method uses an `anyResponse` variable to track if any successful responses were received from any shard. If all shards return NotFound errors, the function properly returns a NotFound status error, so individual shard-level response tracking is not necessary.
Applied to files:
pkg/seqproxyapi/v1/marshaler_test.goapi/seqproxyapi/v1/seq_proxy_api.protostoreapi/grpc_async_search.goapi/storeapi/store_api.protoproxy/search/async.gotests/integration_tests/integration_test.goproxyapi/grpc_async_search.go
🧬 Code Graph Analysis (20)
util/bufferpool.go (1)
bytespool/bytespool.go (2)
Pool(87-98)Buffer(62-64)
fracmanager/async_searcher_test.go (5)
frac/fraction.go (2)
Fraction(18-24)DataProvider(13-16)seq/qpr.go (1)
QPR(68-74)frac/processor/search_params.go (1)
SearchParams(16-28)fracmanager/async_searcher.go (3)
AsyncSearcherConfig(87-93)MustStartAsync(95-130)AsyncSearchRequest(132-137)mappingprovider/mapping_provider.go (1)
WithMapping(26-31)
proxyapi/grpc_v1.go (3)
proxy/search/async.go (4)
FetchAsyncSearchResultRequest(80-85)FetchAsyncSearchResultResponse(87-101)GetAsyncSearchesListRequest(103-108)AsyncSearchesListItem(110-122)pkg/seqproxyapi/v1/seq_proxy_api.pb.go (12)
FetchAsyncSearchResultRequest(1171-1184)FetchAsyncSearchResultRequest(1197-1197)FetchAsyncSearchResultRequest(1212-1214)FetchAsyncSearchResultResponse(1244-1258)FetchAsyncSearchResultResponse(1271-1271)FetchAsyncSearchResultResponse(1286-1288)GetAsyncSearchesListRequest(1506-1517)GetAsyncSearchesListRequest(1530-1530)GetAsyncSearchesListRequest(1545-1547)AsyncSearchesListItem(1621-1635)AsyncSearchesListItem(1648-1648)AsyncSearchesListItem(1663-1665)fracmanager/async_searcher.go (2)
GetAsyncSearchesListRequest(927-930)AsyncSearchesListItem(932-952)
storeapi/grpc_v1.go (1)
fracmanager/async_searcher.go (1)
MustStartAsync(95-130)
proxy/search/ingestor.go (1)
seq/qpr.go (1)
IDSources(41-41)
pkg/seqproxyapi/v1/marshaler_test.go (1)
pkg/seqproxyapi/v1/seq_proxy_api.pb.go (21)
FetchAsyncSearchResultResponse(1244-1258)FetchAsyncSearchResultResponse(1271-1271)FetchAsyncSearchResultResponse(1286-1288)StartAsyncSearchRequest(1043-1059)StartAsyncSearchRequest(1072-1072)StartAsyncSearchRequest(1087-1089)SearchQuery(460-468)SearchQuery(481-481)SearchQuery(496-498)AggQuery(529-538)AggQuery(551-551)AggQuery(566-568)ComplexSearchResponse(949-961)ComplexSearchResponse(974-974)ComplexSearchResponse(989-991)Document(301-308)Document(321-321)Document(336-338)Histogram(415-420)Histogram(433-433)Histogram(448-450)
fracmanager/encoding_test.go (2)
seq/qpr.go (8)
QPR(68-74)ErrorSource(61-64)IDSources(41-41)AggregatableSamples(113-116)AggBin(108-111)SamplesContainer(256-267)IDSource(31-35)NewSamplesContainers(269-277)seq/seq.go (4)
MID(17-17)ID(12-15)RID(18-18)NewID(115-119)
pkg/seqproxyapi/v1/marshaler.go (1)
pkg/seqproxyapi/v1/seq_proxy_api.pb.go (9)
StartAsyncSearchRequest(1043-1059)StartAsyncSearchRequest(1072-1072)StartAsyncSearchRequest(1087-1089)FetchAsyncSearchResultResponse(1244-1258)FetchAsyncSearchResultResponse(1271-1271)FetchAsyncSearchResultResponse(1286-1288)AsyncSearchesListItem(1621-1635)AsyncSearchesListItem(1648-1648)AsyncSearchesListItem(1663-1665)
storeapi/client.go (2)
pkg/storeapi/store_api.pb.go (18)
CancelAsyncSearchRequest(1076-1081)CancelAsyncSearchRequest(1094-1094)CancelAsyncSearchRequest(1109-1111)CancelAsyncSearchResponse(1120-1124)CancelAsyncSearchResponse(1137-1137)CancelAsyncSearchResponse(1152-1154)DeleteAsyncSearchRequest(1156-1161)DeleteAsyncSearchRequest(1174-1174)DeleteAsyncSearchRequest(1189-1191)DeleteAsyncSearchResponse(1200-1204)DeleteAsyncSearchResponse(1217-1217)DeleteAsyncSearchResponse(1232-1234)GetAsyncSearchesListRequest(1236-1242)GetAsyncSearchesListRequest(1255-1255)GetAsyncSearchesListRequest(1270-1272)GetAsyncSearchesListResponse(1288-1293)GetAsyncSearchesListResponse(1306-1306)GetAsyncSearchesListResponse(1321-1323)storeapi/grpc_v1.go (1)
GrpcV1(83-98)
frac/processor/search_params.go (1)
parser/ast_node.go (1)
ASTNode(8-11)
proxy/search/mock/store_api_client_mock.go (2)
pkg/storeapi/store_api.pb.go (18)
CancelAsyncSearchRequest(1076-1081)CancelAsyncSearchRequest(1094-1094)CancelAsyncSearchRequest(1109-1111)CancelAsyncSearchResponse(1120-1124)CancelAsyncSearchResponse(1137-1137)CancelAsyncSearchResponse(1152-1154)DeleteAsyncSearchRequest(1156-1161)DeleteAsyncSearchRequest(1174-1174)DeleteAsyncSearchRequest(1189-1191)DeleteAsyncSearchResponse(1200-1204)DeleteAsyncSearchResponse(1217-1217)DeleteAsyncSearchResponse(1232-1234)GetAsyncSearchesListRequest(1236-1242)GetAsyncSearchesListRequest(1255-1255)GetAsyncSearchesListRequest(1270-1272)GetAsyncSearchesListResponse(1288-1293)GetAsyncSearchesListResponse(1306-1306)GetAsyncSearchesListResponse(1321-1323)proxy/search/async.go (1)
GetAsyncSearchesListRequest(103-108)
fracmanager/encoding.go (3)
seq/qpr.go (7)
QPR(68-74)IDSources(41-41)IDSource(31-35)AggregatableSamples(113-116)AggBin(108-111)SamplesContainer(256-267)ErrorSource(61-64)util/bufferpool.go (1)
BufferPool(9-11)seq/seq.go (3)
MID(17-17)ID(12-15)RID(18-18)
pkg/storeapi/mappings.go (3)
fracmanager/async_searcher.go (5)
AsyncSearchStatus(627-627)AsyncSearchStatusDone(630-630)AsyncSearchStatusInProgress(631-631)AsyncSearchStatusError(632-632)AsyncSearchStatusCanceled(633-633)pkg/storeapi/store_api.pb.go (8)
AsyncSearchStatus(186-186)AsyncSearchStatus(221-223)AsyncSearchStatus(225-227)AsyncSearchStatus(234-236)AsyncSearchStatus_AsyncSearchStatusDone(190-190)AsyncSearchStatus_AsyncSearchStatusInProgress(189-189)AsyncSearchStatus_AsyncSearchStatusError(192-192)AsyncSearchStatus_AsyncSearchStatusCanceled(191-191)pkg/seqproxyapi/v1/mappings.go (2)
ToProtoAsyncSearchStatus(101-106)MustProtoAsyncSearchStatus(108-114)
proxyapi/mock/grpc_v1.go (2)
proxy/search/async.go (4)
FetchAsyncSearchResultRequest(80-85)FetchAsyncSearchResultResponse(87-101)GetAsyncSearchesListRequest(103-108)AsyncSearchesListItem(110-122)pkg/seqproxyapi/v1/seq_proxy_api.pb.go (12)
FetchAsyncSearchResultRequest(1171-1184)FetchAsyncSearchResultRequest(1197-1197)FetchAsyncSearchResultRequest(1212-1214)FetchAsyncSearchResultResponse(1244-1258)FetchAsyncSearchResultResponse(1271-1271)FetchAsyncSearchResultResponse(1286-1288)GetAsyncSearchesListRequest(1506-1517)GetAsyncSearchesListRequest(1530-1530)GetAsyncSearchesListRequest(1545-1547)AsyncSearchesListItem(1621-1635)AsyncSearchesListItem(1648-1648)AsyncSearchesListItem(1663-1665)
pkg/seqproxyapi/v1/mappings.go (4)
fracmanager/async_searcher.go (5)
AsyncSearchStatus(627-627)AsyncSearchStatusDone(630-630)AsyncSearchStatusInProgress(631-631)AsyncSearchStatusError(632-632)AsyncSearchStatusCanceled(633-633)pkg/storeapi/store_api.pb.go (8)
AsyncSearchStatus(186-186)AsyncSearchStatus(221-223)AsyncSearchStatus(225-227)AsyncSearchStatus(234-236)AsyncSearchStatus_AsyncSearchStatusDone(190-190)AsyncSearchStatus_AsyncSearchStatusInProgress(189-189)AsyncSearchStatus_AsyncSearchStatusError(192-192)AsyncSearchStatus_AsyncSearchStatusCanceled(191-191)pkg/seqproxyapi/v1/seq_proxy_api.pb.go (8)
AsyncSearchStatus(189-189)AsyncSearchStatus(230-232)AsyncSearchStatus(234-236)AsyncSearchStatus(243-245)AsyncSearchStatus_AsyncSearchStatusDone(196-196)AsyncSearchStatus_AsyncSearchStatusInProgress(194-194)AsyncSearchStatus_AsyncSearchStatusError(201-201)AsyncSearchStatus_AsyncSearchStatusCanceled(198-198)pkg/storeapi/mappings.go (2)
ToProtoAsyncSearchStatus(130-135)MustProtoAsyncSearchStatus(137-143)
storeapi/grpc_async_search.go (7)
storeapi/grpc_v1.go (1)
GrpcV1(83-98)pkg/storeapi/store_api.pb.go (26)
StartAsyncSearchRequest(716-728)StartAsyncSearchRequest(741-741)StartAsyncSearchRequest(756-758)Order(88-88)Order(117-119)Order(121-123)Order(130-132)FetchAsyncSearchResultRequest(852-860)FetchAsyncSearchResultRequest(873-873)FetchAsyncSearchResultRequest(888-890)FetchAsyncSearchResultResponse(920-939)FetchAsyncSearchResultResponse(952-952)FetchAsyncSearchResultResponse(967-969)GetAsyncSearchesListRequest(1236-1242)GetAsyncSearchesListRequest(1255-1255)GetAsyncSearchesListRequest(1270-1272)GetAsyncSearchesListResponse(1288-1293)GetAsyncSearchesListResponse(1306-1306)GetAsyncSearchesListResponse(1321-1323)AsyncSearchStatus(186-186)AsyncSearchStatus(221-223)AsyncSearchStatus(225-227)AsyncSearchStatus(234-236)AsyncSearchesListItem(1332-1351)AsyncSearchesListItem(1364-1364)AsyncSearchesListItem(1379-1381)frac/processor/search_params.go (1)
SearchParams(16-28)seq/seq.go (2)
MID(17-17)ID(12-15)seq/qpr.go (2)
DocsOrderDesc(19-19)QPR(68-74)fracmanager/async_searcher.go (5)
AsyncSearchRequest(132-137)FetchSearchResultRequest(621-625)GetAsyncSearchesListRequest(927-930)AsyncSearchStatus(627-627)AsyncSearchesListItem(932-952)pkg/storeapi/mappings.go (1)
MustProtoAsyncSearchStatus(137-143)
fracmanager/async_searcher.go (7)
storeapi/grpc_v1.go (1)
MappingProvider(24-26)fracmanager/list.go (1)
List(10-10)parser/seqql.go (1)
ParseSeqQL(28-58)frac/fraction.go (2)
Fraction(18-24)DataProvider(13-16)util/bufferpool.go (1)
BufferPool(9-11)seq/qpr.go (4)
QPR(68-74)DocsOrder(16-16)ErrorSource(61-64)MergeQPRs(355-400)bytespool/bytespool.go (2)
Acquire(22-25)Release(28-30)
proxy/search/async.go (5)
pkg/storeapi/store_api.pb.go (32)
AggQuery(342-351)AggQuery(364-364)AggQuery(379-381)Order(88-88)Order(117-119)Order(121-123)Order(130-132)AsyncSearchStatus(186-186)AsyncSearchStatus(221-223)AsyncSearchStatus(225-227)AsyncSearchStatus(234-236)FetchAsyncSearchResultRequest(852-860)FetchAsyncSearchResultRequest(873-873)FetchAsyncSearchResultRequest(888-890)FetchAsyncSearchResultResponse(920-939)FetchAsyncSearchResultResponse(952-952)FetchAsyncSearchResultResponse(967-969)GetAsyncSearchesListRequest(1236-1242)GetAsyncSearchesListRequest(1255-1255)GetAsyncSearchesListRequest(1270-1272)AsyncSearchesListItem(1332-1351)AsyncSearchesListItem(1364-1364)AsyncSearchesListItem(1379-1381)GetAsyncSearchesListResponse(1288-1293)GetAsyncSearchesListResponse(1306-1306)GetAsyncSearchesListResponse(1321-1323)CancelAsyncSearchRequest(1076-1081)CancelAsyncSearchRequest(1094-1094)CancelAsyncSearchRequest(1109-1111)DeleteAsyncSearchRequest(1156-1161)DeleteAsyncSearchRequest(1174-1174)DeleteAsyncSearchRequest(1189-1191)seq/qpr.go (3)
DocsOrder(16-16)QPR(68-74)MergeQPRs(355-400)fracmanager/async_searcher.go (7)
AsyncSearchStatus(627-627)AsyncSearchStatusDone(630-630)GetAsyncSearchesListRequest(927-930)AsyncSearchesListItem(932-952)AsyncSearchStatusInProgress(631-631)AsyncSearchStatusCanceled(633-633)AsyncSearchStatusError(632-632)proxy/search/ingestor.go (1)
Ingestor(36-41)pkg/storeapi/store_api_vtproto.pb.go (1)
StoreApiClient(1659-1669)
tests/integration_tests/integration_test.go (3)
proxy/search/async.go (3)
AsyncRequest(24-32)FetchAsyncSearchResultRequest(80-85)GetAsyncSearchesListRequest(103-108)seq/qpr.go (2)
QPR(68-74)ErrorSource(61-64)fracmanager/async_searcher.go (2)
AsyncSearchStatusDone(630-630)GetAsyncSearchesListRequest(927-930)
proxyapi/grpc_async_search.go (5)
pkg/seqproxyapi/v1/seq_proxy_api.pb.go (35)
StartAsyncSearchRequest(1043-1059)StartAsyncSearchRequest(1072-1072)StartAsyncSearchRequest(1087-1089)FetchAsyncSearchResultRequest(1171-1184)FetchAsyncSearchResultRequest(1197-1197)FetchAsyncSearchResultRequest(1212-1214)FetchAsyncSearchResultResponse(1244-1258)FetchAsyncSearchResultResponse(1271-1271)FetchAsyncSearchResultResponse(1286-1288)Order(143-143)Order(172-174)Order(176-178)Order(185-187)SearchQuery(460-468)SearchQuery(481-481)SearchQuery(496-498)HistQuery(606-611)HistQuery(624-624)HistQuery(639-641)ComplexSearchResponse(949-961)ComplexSearchResponse(974-974)ComplexSearchResponse(989-991)GetAsyncSearchesListRequest(1506-1517)GetAsyncSearchesListRequest(1530-1530)GetAsyncSearchesListRequest(1545-1547)AsyncSearchStatus(189-189)AsyncSearchStatus(230-232)AsyncSearchStatus(234-236)AsyncSearchStatus(243-245)AggQuery(529-538)AggQuery(551-551)AggQuery(566-568)AsyncSearchesListItem(1621-1635)AsyncSearchesListItem(1648-1648)AsyncSearchesListItem(1663-1665)seq/seq.go (3)
MID(17-17)ID(12-15)MIDToDuration(111-113)proxy/search/async.go (4)
FetchAsyncSearchResultRequest(80-85)FetchAsyncSearchResultResponse(87-101)GetAsyncSearchesListRequest(103-108)AsyncSearchesListItem(110-122)pkg/seqproxyapi/v1/mappings.go (1)
MustProtoAsyncSearchStatus(108-114)fracmanager/async_searcher.go (3)
GetAsyncSearchesListRequest(927-930)AsyncSearchStatus(627-627)AsyncSearchesListItem(932-952)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (27)
frac/processor/search_params.go (1)
16-18: Hiding AST from JSON is the right callOmitting the parsed AST from JSON payloads prevents accidental large/cyclic encodes and keeps the API surface clean. Looks good.
cmd/seq-db/seq-db.go (1)
290-294: Guard against int overflow and confirm DataDir defaulting path
- Casting Bytes to int can overflow on 32-bit builds or with large limits. Either clamp before casting or prefer int64/uint64 in AsyncSearcherConfig.
- The comment in config hints DataDir defaults to a subdir of Storage.DataDir; here we pass it through as-is. Please confirm fracmanager applies that default when empty to avoid writing in CWD.
If keeping int, consider clamping locally:
// helper func clampToInt[T ~int64 | ~uint64](v T) int { if v > T(math.MaxInt) { return math.MaxInt } return int(v) } // use MaxSize: clampToInt(cfg.AsyncSearch.MaxTotalSize), MaxSizePerRequest: clampToInt(cfg.AsyncSearch.MaxSizePerRequest),seq/qpr.go (1)
372-377: Init guard change avoids needless allocs; semantics preservedSwitching to len checks prevents allocating zero-length slices and keeps behavior intact. Good micro-fix.
proxyapi/mock/grpc_v1.go (2)
44-70: New Cancel/Delete async-search mocks look correctSignatures align with the expanded interface. No issues from me.
103-116: List API mock addition matches the new surfaceGetAsyncSearchesList mock and recorder look aligned with the request/response types.
api/storeapi/store_api.proto (2)
20-25: New async-search lifecycle RPCs look goodCancel/Delete/List expand the lifecycle surface cleanly and match the broader PR scope.
201-208: Filters for GetAsyncSearchesList look reasonableoptional status + ids filter is a pragmatic start. Presence on enum is fine in proto3.
proxyapi/ingestor.go (1)
223-225: Unmarshal added — completes the Marshaler contractThis makes humanReadableMarshaler bidirectional with JSONBuiltin. Good.
proxy/search/ingestor.go (1)
137-138: Good extraction: paginateIDs moved to package-levelReduces receiver coupling; call site updated correctly.
pkg/storeapi/mappings.go (1)
100-113: LGTM: bidirectional status mappings are consistent with enum valuesThe mapping aligns fracmanager statuses [Done, InProgress, Error, Canceled] to proto enum values [Done=1, InProgress=0, Error=3, Canceled=2], and the reverse builder is correct.
storeapi/client.go (1)
41-51: LGTM: thin client forwarders mirror new RPCsStraight-through delegation matches existing pattern; ignoring grpc.CallOption is consistent with other methods in this file.
pkg/seqproxyapi/v1/marshaler.go (2)
223-225: StartAsyncSearchRequest JSON marshalling: LGTMDelegating to the shared protojson marshaller with EmitDefaultValues=true is consistent with the rest of the package.
258-260: AsyncSearchesListItem JSON marshalling: LGTMUsing the shared protojson marshaller keeps consistency for list items.
pkg/seqproxyapi/v1/mappings.go (1)
71-76: Status mappings between fracmanager and proto: LGTMIndex-based mapping aligns with the declared enum orders in both systems.
Also applies to: 78-85
proxyapi/grpc_v1.go (1)
33-37: Interface surface update: LGTM; verify implementers/mocks are updated.New async-search methods and the FetchAsyncSearchResult signature change are coherent with the broader API changes. Ensure all SearchIngestor implementations and mocks are updated to satisfy the new interface.
tests/integration_tests/integration_test.go (2)
1947-1976: Async-search happy-path coverage: LGTMEnd-to-end start→fetch flow validations (status, exp/agg/hist/progress) are solid and representative.
2042-2052: List/delete flow assertions: LGTMValidates ordering, status, request echo, expiry and progress; delete is asserted via list shrinking. Good coverage.
storeapi/grpc_async_search.go (2)
96-102: CancelAsyncSearch/DeleteAsyncSearch: LGTMIdempotent behavior is fine for now. If needed later, consider surfacing NotFound for unknown IDs.
150-179: convertAsyncSearchesToProto: LGTMClean translation of fields, including conditional canceled_at and duration/timestamps.
proxy/search/mock/store_api_client_mock.go (3)
60-78: LGTM!The mock implementation for
CancelAsyncSearchfollows the established pattern correctly.
80-98: LGTM!The mock implementation for
DeleteAsyncSearchis consistent with other mock methods.
140-158: LGTM!The mock implementation for
GetAsyncSearchesListfollows the established pattern correctly.fracmanager/encoding.go (1)
424-426: Fix the status weight mapping.The
statusWeightslice is being used incorrectly. You're trying to use the status values as indices, but the status values are 0-3, not 1-4. This will cause a panic or incorrect behavior.- statusWeight := []fracmanager.AsyncSearchStatus{ - fracmanager.AsyncSearchStatusDone: 1, - fracmanager.AsyncSearchStatusInProgress: 2, - fracmanager.AsyncSearchStatusCanceled: 3, - fracmanager.AsyncSearchStatusError: 4, - } - weightA := statusWeight[a] - weightB := statusWeight[b] + statusWeight := map[fracmanager.AsyncSearchStatus]int{ + fracmanager.AsyncSearchStatusDone: 1, + fracmanager.AsyncSearchStatusInProgress: 2, + fracmanager.AsyncSearchStatusCanceled: 3, + fracmanager.AsyncSearchStatusError: 4, + } + weightA := statusWeight[a] + weightB := statusWeight[b]Likely an incorrect or invalid review comment.
proxyapi/grpc_async_search.go (2)
27-33: Validate histogram interval parsing.Good error handling for histogram interval parsing.
56-61: LGTM!The streaming approach for fetching async search results with docs iterator is well-designed.
proxy/search/async.go (1)
267-275: LGTM!The pagination logic for IDs is correctly implemented with proper offset and size handling.
fracmanager/async_searcher.go (1)
169-188: Avoid potential data race on atomic fieldsThe
merged,qprsSize, andinfoSizeatomic fields are being stored directly in the map, but theasyncSearchInfostruct is passed by value in several places. This could lead to copying the atomic values, which is not safe.Consider either:
- Store pointers to
asyncSearchInfoin the map instead of values- Make the atomic fields regular fields protected by the mutex
Here's the recommended approach to store pointers:
-requests map[string]asyncSearchInfo +requests map[string]*asyncSearchInfoAnd update the constructor:
-func newAsyncSearchInfo(r AsyncSearchRequest, list List) asyncSearchInfo { +func newAsyncSearchInfo(r AsyncSearchRequest, list List) *asyncSearchInfo { fracsToSearch := make([]fracSearchState, 0, len(list)) for _, f := range list { fracsToSearch = append(fracsToSearch, fracSearchState{Name: f.Info().Name()}) } ctx, cancel := context.WithCancel(context.Background()) - return asyncSearchInfo{ + return &asyncSearchInfo{ Finished: false, Error: "", CanceledAt: time.Time{}, ctx: ctx, cancel: cancel, Request: r, Fractions: fracsToSearch, StartedAt: time.Now(), merged: &atomic.Bool{}, qprsSize: &atomic.Int64{}, infoSize: &atomic.Int64{}, } }Likely an incorrect or invalid review comment.
f87bc21 to
3023ae8
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (10)
api/seqproxyapi/v1/seq_proxy_api.proto (2)
278-289: Well-structured async search status enumThe status enum provides clear state transitions with helpful documentation. Consider adding a pending state per past review suggestion if scheduling becomes relevant.
267-276: Document pagination parameter behavior when with_docs=falseThe comments indicate these fields are ignored when
with_docsis false, but server-side validation would be clearer than silent ignoring.api/storeapi/store_api.proto (3)
142-151: Client-supplied search_id creates security risksThe client provides the
search_idwhich shifts uniqueness responsibility to the client and could enable ID collision attacks.
174-174: Remove redundant 'optional' on message-typed fieldProto3 message fields already track presence; the
optionalmodifier is unnecessary and may cause compilation issues.- optional google.protobuf.Timestamp canceled_at = 5; + google.protobuf.Timestamp canceled_at = 5;
215-215: Remove redundant 'optional' on canceled_at fieldSame issue as above -
optionalis unnecessary on message-typed fields.- optional google.protobuf.Timestamp canceled_at = 5; + google.protobuf.Timestamp canceled_at = 5;fracmanager/async_searcher.go (5)
270-279: Potential goroutine leak from struct copyingThe
updateSearchInfofunction updates a copy ofasyncSearchInfowhich containsctxandcancelfields. This breaks context cancellation propagation.
288-298: Atomic fields won't serialize correctly to JSONThe
asyncSearchInfostruct contains atomic pointer fields that won't marshal/unmarshal properly, causing issues when loading from disk.
419-431: Non-atomic disk usage calculationReading two atomic values separately can yield inconsistent results. Load atomics once into locals before computing.
578-589: Missing context initialization after unmarshalingLoaded
asyncSearchInfohas nilctxandcancelafter unmarshaling, causing nil pointer dereferences.
843-849: Race condition during lock release and reacquisitionReleasing read lock to call
updateSearchInfoallows map mutations between operations.
🧹 Nitpick comments (1)
tests/integration_tests/integration_test.go (1)
1979-1989: Consider adding timeout to Eventually assertionThe polling loop could hang indefinitely if the async search never completes. Consider adding a more explicit error message when timeout occurs.
- r.Eventually(func() bool { - resp, _, err := searcher.FetchAsyncSearchResult(ctx, freq) - r.NoError(err) - return resp.Status == fracmanager.AsyncSearchStatusDone - }, 10*time.Second, 50*time.Millisecond) + r.Eventually(func() bool { + resp, _, err := searcher.FetchAsyncSearchResult(ctx, freq) + r.NoError(err) + return resp.Status == fracmanager.AsyncSearchStatusDone + }, 10*time.Second, 50*time.Millisecond, "async search did not complete within timeout")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (6)
pkg/seqproxyapi/v1/seq_proxy_api.pb.gois excluded by!**/*.pb.gopkg/seqproxyapi/v1/seq_proxy_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/storeapi/store_api_vtproto.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (27)
api/seqproxyapi/v1/seq_proxy_api.proto(4 hunks)api/storeapi/store_api.proto(2 hunks)cmd/seq-db/seq-db.go(1 hunks)config/config.go(1 hunks)frac/meta_data_collector.go(1 hunks)frac/processor/search_params.go(1 hunks)fracmanager/async_searcher.go(6 hunks)fracmanager/async_searcher_test.go(1 hunks)fracmanager/encoding.go(1 hunks)fracmanager/encoding_test.go(1 hunks)pkg/seqproxyapi/v1/mappings.go(2 hunks)pkg/seqproxyapi/v1/marshaler.go(1 hunks)pkg/seqproxyapi/v1/marshaler_test.go(1 hunks)pkg/storeapi/mappings.go(2 hunks)proxy/search/async.go(4 hunks)proxy/search/ingestor.go(2 hunks)proxy/search/mock/store_api_client_mock.go(2 hunks)proxyapi/grpc_async_search.go(3 hunks)proxyapi/grpc_v1.go(1 hunks)proxyapi/ingestor.go(1 hunks)proxyapi/mock/grpc_v1.go(3 hunks)seq/qpr.go(1 hunks)storeapi/client.go(1 hunks)storeapi/grpc_async_search.go(3 hunks)storeapi/grpc_v1.go(1 hunks)tests/integration_tests/integration_test.go(4 hunks)util/bufferpool.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (22)
- util/bufferpool.go
- proxyapi/ingestor.go
- proxy/search/mock/store_api_client_mock.go
- frac/meta_data_collector.go
- cmd/seq-db/seq-db.go
- frac/processor/search_params.go
- config/config.go
- storeapi/grpc_v1.go
- storeapi/grpc_async_search.go
- fracmanager/encoding_test.go
- pkg/seqproxyapi/v1/marshaler.go
- fracmanager/encoding.go
- seq/qpr.go
- proxyapi/mock/grpc_v1.go
- storeapi/client.go
- proxyapi/grpc_v1.go
- pkg/seqproxyapi/v1/mappings.go
- pkg/seqproxyapi/v1/marshaler_test.go
- proxyapi/grpc_async_search.go
- fracmanager/async_searcher_test.go
- proxy/search/ingestor.go
- pkg/storeapi/mappings.go
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: forshev
PR: ozontech/seq-db#3
File: api/seqproxyapi/v1/seq_proxy_api.proto:88-90
Timestamp: 2025-08-05T10:52:04.737Z
Learning: In the seq-db project, the FetchAsyncSearchResult RPC method in api/seqproxyapi/v1/seq_proxy_api.proto was intentionally changed from HTTP GET to POST during development. This breaking change is acceptable because the async search feature is under active development and not yet in production use.
📚 Learning: 2025-08-05T10:52:04.737Z
Learnt from: forshev
PR: ozontech/seq-db#3
File: api/seqproxyapi/v1/seq_proxy_api.proto:88-90
Timestamp: 2025-08-05T10:52:04.737Z
Learning: In the seq-db project, the FetchAsyncSearchResult RPC method in api/seqproxyapi/v1/seq_proxy_api.proto was intentionally changed from HTTP GET to POST during development. This breaking change is acceptable because the async search feature is under active development and not yet in production use.
Applied to files:
api/seqproxyapi/v1/seq_proxy_api.protoproxy/search/async.goapi/storeapi/store_api.proto
📚 Learning: 2025-08-05T10:51:10.548Z
Learnt from: forshev
PR: ozontech/seq-db#3
File: proxy/search/async.go:155-171
Timestamp: 2025-08-05T10:51:10.548Z
Learning: In proxy/search/async.go, the FetchAsyncSearchResult method uses an `anyResponse` variable to track if any successful responses were received from any shard. If all shards return NotFound errors, the function properly returns a NotFound status error, so individual shard-level response tracking is not necessary.
Applied to files:
api/seqproxyapi/v1/seq_proxy_api.prototests/integration_tests/integration_test.goproxy/search/async.goapi/storeapi/store_api.proto
🧬 Code Graph Analysis (3)
fracmanager/async_searcher.go (7)
storeapi/grpc_v1.go (1)
MappingProvider(24-26)fracmanager/list.go (1)
List(10-10)parser/seqql.go (1)
ParseSeqQL(28-58)frac/fraction.go (2)
Fraction(18-24)DataProvider(13-16)util/bufferpool.go (1)
BufferPool(9-11)seq/qpr.go (4)
QPR(68-74)DocsOrder(16-16)ErrorSource(61-64)MergeQPRs(355-400)bytespool/bytespool.go (2)
Acquire(22-25)Release(28-30)
tests/integration_tests/integration_test.go (3)
proxy/search/async.go (3)
AsyncRequest(24-32)FetchAsyncSearchResultRequest(80-85)GetAsyncSearchesListRequest(103-108)seq/qpr.go (2)
QPR(68-74)ErrorSource(61-64)fracmanager/async_searcher.go (2)
AsyncSearchStatusDone(630-630)GetAsyncSearchesListRequest(927-930)
proxy/search/async.go (6)
pkg/storeapi/store_api.pb.go (32)
AggQuery(342-351)AggQuery(364-364)AggQuery(379-381)Order(88-88)Order(117-119)Order(121-123)Order(130-132)AsyncSearchStatus(186-186)AsyncSearchStatus(221-223)AsyncSearchStatus(225-227)AsyncSearchStatus(234-236)FetchAsyncSearchResultRequest(852-860)FetchAsyncSearchResultRequest(873-873)FetchAsyncSearchResultRequest(888-890)FetchAsyncSearchResultResponse(920-939)FetchAsyncSearchResultResponse(952-952)FetchAsyncSearchResultResponse(967-969)GetAsyncSearchesListRequest(1236-1242)GetAsyncSearchesListRequest(1255-1255)GetAsyncSearchesListRequest(1270-1272)AsyncSearchesListItem(1332-1351)AsyncSearchesListItem(1364-1364)AsyncSearchesListItem(1379-1381)GetAsyncSearchesListResponse(1288-1293)GetAsyncSearchesListResponse(1306-1306)GetAsyncSearchesListResponse(1321-1323)CancelAsyncSearchRequest(1076-1081)CancelAsyncSearchRequest(1094-1094)CancelAsyncSearchRequest(1109-1111)DeleteAsyncSearchRequest(1156-1161)DeleteAsyncSearchRequest(1174-1174)DeleteAsyncSearchRequest(1189-1191)proxy/search/search_request.go (1)
AggQuery(9-15)seq/qpr.go (4)
DocsOrder(16-16)QPR(68-74)ErrorSource(61-64)MergeQPRs(355-400)fracmanager/async_searcher.go (7)
AsyncSearchStatus(627-627)AsyncSearchStatusDone(630-630)GetAsyncSearchesListRequest(927-930)AsyncSearchesListItem(932-952)AsyncSearchStatusInProgress(631-631)AsyncSearchStatusCanceled(633-633)AsyncSearchStatusError(632-632)proxy/search/ingestor.go (1)
Ingestor(36-41)pkg/storeapi/store_api_vtproto.pb.go (1)
StoreApiClient(1659-1669)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (5)
tests/integration_tests/integration_test.go (2)
1951-1971: Async search request structure looks goodThe construction of
startReqwith retention, aggregations, and histogram interval is well-structured and properly uses UTC timestamps.
2042-2063: Comprehensive list validationGood test coverage for
GetAsyncSearchesListincluding sorting order verification and cross-field consistency checks.proxy/search/async.go (3)
41-44: Missing error handling for getAsyncSearchStoresGood addition of error checking here. The error properly propagates up the call stack.
423-435: Inconsistent status weight indexingThe
statusWeightslice uses status values as array indices, but the values are 1-4 while the status constants are 0-3. This causes an out-of-bounds access.Fix the weight map to use proper indices:
func mergeAsyncSearchStatus(a, b fracmanager.AsyncSearchStatus) fracmanager.AsyncSearchStatus { - statusWeight := []fracmanager.AsyncSearchStatus{ - fracmanager.AsyncSearchStatusDone: 1, - fracmanager.AsyncSearchStatusInProgress: 2, - fracmanager.AsyncSearchStatusCanceled: 3, - fracmanager.AsyncSearchStatusError: 4, - } - weightA := statusWeight[a] - weightB := statusWeight[b] + statusWeight := map[fracmanager.AsyncSearchStatus]int{ + fracmanager.AsyncSearchStatusDone: 1, + fracmanager.AsyncSearchStatusInProgress: 2, + fracmanager.AsyncSearchStatusCanceled: 3, + fracmanager.AsyncSearchStatusError: 4, + } + weightA := statusWeight[a] + weightB := statusWeight[b] if weightA >= weightB { return a } return b }Likely an incorrect or invalid review comment.
148-177: Potential goroutine leak in concurrent shard processingThe goroutines started here don't check the context cancellation inside the loop. If the context is cancelled while iterating replicas, the goroutines will continue running unnecessarily.
Add context checking inside the replica loop:
go func(shard []string) { defer wg.Done() for _, replica := range shard { + select { + case <-storesCtx.Done(): + return + default: + } storeResp, err := si.clients[replica].FetchAsyncSearchResult(storesCtx, &req)⛔ Skipped due to learnings
Learnt from: forshev PR: ozontech/seq-db#3 File: proxy/search/async.go:155-171 Timestamp: 2025-08-05T10:51:10.548Z Learning: In proxy/search/async.go, the FetchAsyncSearchResult method uses an `anyResponse` variable to track if any successful responses were received from any shard. If all shards return NotFound errors, the function properly returns a NotFound status error, so individual shard-level response tracking is not necessary.
Summary by CodeRabbit
New Features
Improvements